前言:
去年下半年,我一直在搞模型工程化的问题,最终呢选择了airflow作为模型调度的工具,中间遇到了很多的问题。难免需要去网上搜点答案,可能是国内使用的airflow的人群比较少,搜到的答案不是过时了,就是驴唇不对马嘴,还有很久就是直接把国外的帖子使用翻译工具翻译后贴出来。
不过呢,好在经过我多方的摸索,最后还是解决了问题,下面就整理一下相关问题的解决思路。
问题背景:
如何配置airflow的跨Dags依赖问题?当前在运行的模型中有很多依赖关系,比如模型B依赖模型A,模型C依赖模型B和A的结果,虽然airflow更推荐的方式在一个Dag中配置所有的任务,这样也好管理,但是对于不同人维护或者不同运行频率的模型来说,还是不能完全的满足需求,那么必须存在跨Dag的依赖关系。
在同一个Dag的中配置依赖关系直接使用A>>B,[A,B]>>C等等,都可以构建出来依赖关系,那么不同Dag中是如何处理呢?这里呢有两种方法解决
解决方案:
如果是单一条件的依赖,可以选择TriggerDagRunOperator,这是airflow提供的众多Operators的一个,继承自BaseOperator,官方给的说明:Triggers a DAG run for aspecified ``dag_id`` ,意思就是说触发指定的Dag运行。使用起来比较简单,下面给出个综合示例。
如果是多个条件的依赖,比如dagC 依赖A和B,那么TriggerDagRunOperator就不太能满足条件,因为A和B的运行结束时间可能不一样,A结束了,但是B还在运行,这时候如果通知C运行,那么是输入的数据不完整,是不符合要求的。
那么这个时候ExternalTaskSensor 就派上用场了,ExternalTaskSensor就比较复杂了,也有很多坑,官方文档给的说明很少,能搜到的资料还有错误,在这里我也是没少花时间摸索。
ExternalTaskSensor的配置不是很复杂,大致参数如下:
t0 = ExternalTaskSensor(
task_id='monitor_common_dag', # taskid
external_dag_id='testA', # 需要等待的外部DAG id
external_task_id='testA_function2', # 需要等待的外部Task id,可以为None
execution_delta=timedelta(minutes=5), # 执行时间差
# execution_date_fn=lambda dt: dt +timedelta(minutes=5),
timeout=600, # 超时时间
allowed_states=['success'],
mode='reschedule',
)
其中execution_delta和execution_date_fn 是最坑的。关于execution_delta 的配置,官方给的解释是:与前一次执行的时间差默认是相同的execution_date作为当前任务或DAG。或者可以将Execution_delta或execution_date_fn传给ExternalTaskSensor,但不是两者设置,只能二选一。
使用ExternalTaskSensor的默认配置是A和B 和C的任务执行时间是一样的,就是说Dag中的schedule_interval配置是相同的,如果不同,则需要在这里说明。否则ExternalTaskSensor 会等待到超时,也不会执行。
如果是说,ABC都有自己的固定执行时间也行,可是如果ABC并不会主动触发执行,他们的schedule_interval 是None,怎么办呢?那么这个地方就需要使用execution_date_fn 方法作设置。
环境配置:
Python 3.8
Airflow 2.2.0
Airflow低版本中可能没有上述的两个Operators,建议使用2.0以后的版本。
代码示例:
tastA: 父任务
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
with DAG(
dag_id='testA',
default_args=default_args,
description='testA',
schedule_interval="0 12 * * *", # 每天12点执行一次
start_date=datetime(2022, 1, 1), # 从指定日期开始执行
catchup=False,
tags=['test'],
) as dag:
# 执行任务1
t1 = BashOperator(
task_id='testA_function1',
bash_command='python t1.py',
)
# 执行任务2
t2 = BashOperator(
task_id='testA_function2',
bash_command="python t2.py",
)
# 通知另外一个Dag运行。主动通知
t3 = TriggerDagRunOperator(
task_id='testA_function3',
# 需要执行的dagid
trigger_dag_id='testB'
)
# 任务1,2依次执行,执行完成后通知dag testB 执行
t1 >> t2 >> t3
tastB: 子任务
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.models import DagRun
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
with DAG(
dag_id='testB',
default_args=default_args,
description='testB',
schedule_interval="0 12 * * *", # 每天12点执行一次
start_date=datetime(2022, 1, 1), # 从指定日期开始执行
catchup=False,
tags=['test'],
) as dag:
# 这里实例化一个ExterTaskSensor
t0 = ExternalTaskSensor(
task_id='monitor_testA',
external_dag_id='testA', # 需要等待的外部DAG id
external_task_id='testA_function2', # 需要等待的外部Task id,可以为None
execution_date_fn=DagRun.find(dag_id="testA").pop().execution_date,
timeout=600, # 超时时间
allowed_states=['success'],
mode='reschedule',
)
t1 = BashOperator(
task_id='testB_function1',
bash_command='python t1.py',
)
t2 = BashOperator(
task_id='testB_function2',
bash_command="python t2.py",
)
t0 >> t1 >> t2
这里最重要的一句是
execution_date_fn=DagRun.find(dag_id="testA").pop().execution_date
意思是找到testA的最近一次的执行时间,然后进行监听,如果tastA执行完成了,则 monitor_testA 的任务也就完成了,才会进行后续的操作。这种方式适用于各个任务没有自己的schedule_interval,都是被别的任务调起的,自己不会主动去运行。
注意上面的testA和testB中是两种Dag的依赖方式,真正使用的时候选择一个使用即可,我为了方便,两种方式放在一起做示例。
总结:
这里个人实践下来,推荐使用TriggerDagRunOperator,如果执行频率不一样,使用ExternalTaskSensor的坑会很多。
那么如果有多个依赖的父任务,那么可以根据经验,在执行时间长的那个任务中使用TriggerDagRunOperator通知后续的子任务进行,但是这个并不是100%的安全,可以在任务执行的时候添加相关的数据验证操作。