我是airflow的新手,正在尝试弄清楚如何将DAG运行日期传递给每个任务,我的DAG中有以下内容:
tzinfo=tz.gettz('America/Los_Angeles')
dag_run_date = datetime.now(_tzinfo)
dag = DAG(
'myDag',
default_args=default_args,
schedule_interval = None,
params = {
"runDateTimeTz" : dag_run_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
}
)然后我尝试将runDateTimeTz参数传递给我的每个任务,如下所示。
task1 = GKEPodOperator(
image='gcr.io/myJar:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar.jar", {{params.runDateTimeTz}}"],
dag=dag)
task2 = GKEPodOperator(
image='gcr.io/myJar2:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar2.jar", {{params.runDateTimeTz}}"],
dag=dag)我的任务可以正确执行,但我希望所有任务在params.runDateTimeTz中都能收到相同的运行日期,但这并没有发生,例如,task1获取params.runDateTimeTz=2020-04-16T07:42:47.412716-07:00,task2获取params.runDateTimeTz= 2020-04-16T07:43:29.913289-07:00
我认为这种行为与气流填充DAG的params的方式有关,看起来params.runDateTimeTz只在任务开始运行时获取,但我想在此之前获取它,并将其作为参数发送给每个任务,期望所有任务都获得相同的值。
有人能帮我解决我做错了什么吗?
发布于 2020-04-17 23:08:15
您可以使用Airflow宏中的execution_date或ds:
详情:https://airflow.apache.org/docs/stable/macros-ref#default-variables
task1 = GKEPodOperator(
image='gcr.io/myJar:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar.jar", "{{ ds }}"],
dag=dag)
task2 = GKEPodOperator(
image='gcr.io/myJar2:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar2.jar", "{{ ds }}"],
dag=dag)如果需要时间戳,可以使用{{ ts }}
https://stackoverflow.com/questions/61274612
复制相似问题