首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >气流默认变量-增量负载设置

气流默认变量-增量负载设置
EN

Stack Overflow用户
提问于 2020-11-02 17:46:45
回答 3查看 494关注 0票数 0

我正在尝试为从rds postgres到另一个postgres rds的数据提取实现增量数据加载。

我使用airflow来实现ETL。因此,在阅读了一段时间有关airflow宏的内容后,我决定使用airflow默认变量来设置增量流。

所以,算法是这样的,

如果我的上一次执行日期是None或null或'':从时间的开始(在我们的例子中是一年前)选择数据,否则选择上一次执行日期end,如果

注意:下面的代码首先是为了理解默认变量,这还没有针对我上面提到的问题实现

相应的代码如下所示。当我第一次运行dag时,我总是为之前的成功执行日期变量打印'None‘,而不是像我提到的那样打印历史日期。我无法弄清楚这一点。任何关于这方面的想法都会有很大的帮助

代码语言:javascript
运行
复制
from datetime import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

default_args={'owner':'airflow','start_date': days_ago(1),'depends_on_past':'False'}


dag = DAG('jinja_trial_10',default_args=default_args,schedule_interval=timedelta(minutes=5))



def printexecutiontimes(**kwargs):
    executiondate = kwargs.get('execution_date')
    previoussuccessfulexecutiondate =  kwargs.get('prev_execution_date_success')
    previousexecutiondate = kwargs.get('prev_ds_nodash')

    if (previoussuccessfulexecutiondate == 'None' or previoussuccessfulexecutiondate is None):
        previoussuccessfulexecutiondate = datetime.strftime(datetime.now() - timedelta(days = 365),'%Y-%m-%d')


    print('Execution Date : {0}'.format(executiondate))
    print('Previous successful execution date : {0}'.format(previoussuccessfulexecutiondate))
    print('Previous execution date : {0}'.format(previousexecutiondate))


    print('hello')




task_start  = DummyOperator(task_id = 'start',dag=dag)

jinja_task= PythonOperator(task_id = 'TryingoutJinjatemplates',
                           python_callable =printexecutiontimes,
                           provide_context = 'True',
                           dag=dag )

task_end = DummyOperator(task_id = 'end',dag=dag)


task_start >>jinja_task >> task_end
EN

回答 3

Stack Overflow用户

发布于 2020-11-02 19:15:51

我最近不得不做一些非常类似的事情,下面的代码是我最终使用DagRun details创建自定义函数的代码。

Refer to this answer -如果您只想获取上次运行的DAG (无论状态如何)。

对于我来说,我必须获得最后一次成功运行的日期,因此创建了以下函数:

代码语言:javascript
运行
复制
def get_last_dag_run(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    for dag_run in dag_runs:
        #print all dag runs - debug only
        print(f"All ----- state: {dag_run.state} , run_id: {dag_run.run_id} , execution_date: {dag_run.execution_date}")

    print('Success runs ---------------------------------')
    dag_runs = list(filter(lambda x: x.state == 'success', dag_runs))
    for dag_run in dag_runs:
        #print successfull dag runs - debug only
        print(f"Success - state: {dag_run.state} , run_id: {dag_run.run_id} , execution_date: {dag_run.execution_date}")
    
    # return last execution run or default value (01-01-1970) 
    return dag_runs[0].execution_date if dag_runs else datetime(1970, 1, 1)
票数 0
EN

Stack Overflow用户

发布于 2021-01-12 19:29:27

经过几次实验和大量阅读之后,我想出了以下代码,它对我来说是有效的

  • 在Airflow UI中创建一个变量并将其赋值为0
  • 使用Airflow的预定义变量来确定它是全负荷还是增量负荷
  • 伪代码-

代码语言:javascript
运行
复制
If
    value of Variable created = 0
then 
    set Variable = 1
    set the start data to point in time in the past(a date-time from the inception of a certain process)
    set the end date to the value of the "execution_date" (defined as a part of airflow's predefined variables)
else
    set the start date to "prev_execution_date_success" (defined as a part of airflow's predefined variables)
    set the end date to "execution_date" (defined as a part of airflow's predefined variables)
end

下面是相同的代码片段

代码语言:javascript
运行
复制
from datetime import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

default_args={'owner':'airflow','start_date': datetime(2020,11,3,12,5),'depends_on_past':'False'}


dag = DAG('airflow_incremental_load_setup',default_args=default_args,schedule_interval=timedelta(minutes=5))

def printexecutiontimes(**kwargs):
    # Variable to be created before the running of dag
    full_load_check = Variable.get('full_load_completion')
    print('full_load_check : {0}'.format(full_load_check))
    if full_load_check == '0':
        print('First execution')
        print('Execution date : {0}'.format(kwargs.get('execution_date')))
        print('Actual start date : {0}'.format(kwargs.get('ds')))
        print('Previous successful execution date : {0}'.format(kwargs.get('prev_execution_date_success')))
        print('Calculated field : {0}'.format(datetime.strftime(datetime.now() - timedelta(days=365), '%Y-%m-%d')))
        Variable.set('full_load_check', '1')
        start_date = datetime.strftime(datetime.now() - timedelta(days=365), '%Y-%m-%d')
        end_date = datetime.strftime(kwargs.get('execution_date'), '%Y-%m-%d')
    else:
        print('After the first execution ..')
        print('Execution date : {0}'.format(kwargs.get('execution_date')))
        print('Actual start date : {0}'.format(kwargs.get('ds')))
        print('Previous successful execution date : {0}'.format(kwargs.get('prev_execution_date_success')))
        print('Calculated field : {0}'.format(kwargs.get('prev_execution_date_success')))
        start_date = kwargs.get('prev_execution_date_success')
        start_date = parse(str(start_date))
        end_date = kwargs.get('execution_date')
        end_date = parse(str(end_date))
        print('Type of start_date_check : {0}'.format(type(start_date)))
        start_date = datetime.strftime(start_date, '%Y-%m-%d')
        end_date = datetime.strftime(end_date, '%Y-%m-%d')

task_start  = DummyOperator(task_id = 'start',dag=dag)

main_task= PythonOperator(task_id = 'IncrementalJobTask',
                            python_callable =printexecutiontimes,
                            provide_context = 'True',
                            dag=dag )

task_end = DummyOperator(task_id = 'end',dag=dag)


task_start >>main_task >> task_end
票数 0
EN

Stack Overflow用户

发布于 2021-04-15 20:35:29

它帮助了我:

代码语言:javascript
运行
复制
if isinstance(context['prev_execution_date_success'], type(None)):
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64643140

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档