我试图在我的Python操作符中使用Airflow宏,但我一直收到"airflow: error: unrecognized:“
所以我导入了一个有3个位置参数的函数:(sys.argv,start_date,end_date),我希望把start_date和end_date作为Airflow中的执行日期。
函数参数如下所示
def main(argv,start_date,end_date):
这是我在DAG中的任务:
t1 = PythonOperator(
task_id='Pull_DCM_Report',
provide_context=True,
python_callable=main,
op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
dag=dag)
发布于 2018-06-06 04:51:00
由于您传递的是需要由Airflow呈现的日期,因此需要在Python操作符中使用templates_dict
参数。此字段是Airflow将识别为包含模板的唯一字段。
您可以创建一个自定义Python操作符,通过复制现有操作符并将相关字段添加到template_fields
元组中,将更多字段识别为模板。
def main(**kwargs):
argv = kwargs.get('templates_dict').get('argv')
start_date = kwargs.get('templates_dict').get('start_date')
end_date = kwargs.get('templates_dict').get('end_date')
t1 = PythonOperator(task_id='Pull_DCM_Report',
provide_context=True,
python_callable=main,
templates_dict={'argv': sys.argv,
'start_date': '{{ yesterday_ds }}',
'end_date': '{{ ds }}'},
dag=dag)
发布于 2018-06-07 01:00:03
您可以使用以下代码“包装”对main
函数的调用:
t1 = PythonOperator(
task_id='Pull_DCM_Report',
provide_context=True,
python_callable=lambda **context: main([], context["ds"], context["ds"]),
dag=dag)
如果lambda不是你喜欢的,你可以定义一个函数,调用它,然后让它调用main
。
https://stackoverflow.com/questions/50708226
复制相似问题