首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >如何实现airflow中的跨Dag依赖的问题

如何实现airflow中的跨Dag依赖的问题

作者头像
马拉松程序员
发布2022-04-26 19:16:53
发布2022-04-26 19:16:53
5.5K10
代码可运行
举报
运行总次数:0
代码可运行

前言:

去年下半年,我一直在搞模型工程化的问题,最终呢选择了airflow作为模型调度的工具,中间遇到了很多的问题。难免需要去网上搜点答案,可能是国内使用的airflow的人群比较少,搜到的答案不是过时了,就是驴唇不对马嘴,还有很久就是直接把国外的帖子使用翻译工具翻译后贴出来。

不过呢,好在经过我多方的摸索,最后还是解决了问题,下面就整理一下相关问题的解决思路。

问题背景:

如何配置airflow的跨Dags依赖问题?当前在运行的模型中有很多依赖关系,比如模型B依赖模型A,模型C依赖模型B和A的结果,虽然airflow更推荐的方式在一个Dag中配置所有的任务,这样也好管理,但是对于不同人维护或者不同运行频率的模型来说,还是不能完全的满足需求,那么必须存在跨Dag的依赖关系。

在同一个Dag的中配置依赖关系直接使用A>>B,[A,B]>>C等等,都可以构建出来依赖关系,那么不同Dag中是如何处理呢?这里呢有两种方法解决

解决方案:

如果是单一条件的依赖,可以选择TriggerDagRunOperator,这是airflow提供的众多Operators的一个,继承自BaseOperator,官方给的说明:Triggers a DAG run for aspecified ``dag_id`` ,意思就是说触发指定的Dag运行。使用起来比较简单,下面给出个综合示例。

如果是多个条件的依赖,比如dagC 依赖A和B,那么TriggerDagRunOperator就不太能满足条件,因为A和B的运行结束时间可能不一样,A结束了,但是B还在运行,这时候如果通知C运行,那么是输入的数据不完整,是不符合要求的。

那么这个时候ExternalTaskSensor 就派上用场了,ExternalTaskSensor就比较复杂了,也有很多坑,官方文档给的说明很少,能搜到的资料还有错误,在这里我也是没少花时间摸索。

ExternalTaskSensor的配置不是很复杂,大致参数如下:

代码语言:javascript
代码运行次数:0
运行
复制
  t0 = ExternalTaskSensor(
        task_id='monitor_common_dag',  # taskid
        external_dag_id='testA',  # 需要等待的外部DAG id
        external_task_id='testA_function2',  # 需要等待的外部Task id,可以为None
        execution_delta=timedelta(minutes=5),  # 执行时间差
        # execution_date_fn=lambda dt: dt +timedelta(minutes=5),
        timeout=600,  # 超时时间
        allowed_states=['success'],
        mode='reschedule',
    )

其中execution_delta和execution_date_fn 是最坑的。关于execution_delta 的配置,官方给的解释是:与前一次执行的时间差默认是相同的execution_date作为当前任务或DAG。或者可以将Execution_delta或execution_date_fn传给ExternalTaskSensor,但不是两者设置,只能二选一。

使用ExternalTaskSensor的默认配置是A和B 和C的任务执行时间是一样的,就是说Dag中的schedule_interval配置是相同的,如果不同,则需要在这里说明。否则ExternalTaskSensor 会等待到超时,也不会执行。

如果是说,ABC都有自己的固定执行时间也行,可是如果ABC并不会主动触发执行,他们的schedule_interval 是None,怎么办呢?那么这个地方就需要使用execution_date_fn 方法作设置。

环境配置:

Python 3.8

Airflow 2.2.0

Airflow低版本中可能没有上述的两个Operators,建议使用2.0以后的版本。

代码示例:

tastA: 父任务

代码语言:javascript
代码运行次数:0
运行
复制
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}
with DAG(
        dag_id='testA',
        default_args=default_args,
        description='testA',
        schedule_interval="0 12 * * *",  # 每天12点执行一次
        start_date=datetime(2022, 1, 1),  # 从指定日期开始执行
        catchup=False,
        tags=['test'],
) as dag:
    # 执行任务1
    t1 = BashOperator(
        task_id='testA_function1',
        bash_command='python t1.py',
    )
    # 执行任务2
    t2 = BashOperator(
        task_id='testA_function2',
        bash_command="python t2.py",
    )
    # 通知另外一个Dag运行。主动通知
    t3 = TriggerDagRunOperator(
        task_id='testA_function3',
        # 需要执行的dagid
        trigger_dag_id='testB'
    )
    # 任务1,2依次执行,执行完成后通知dag testB 执行
    t1 >> t2 >> t3

tastB: 子任务

代码语言:javascript
代码运行次数:0
运行
复制
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.models import DagRun
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}
with DAG(
        dag_id='testB',
        default_args=default_args,
        description='testB',
        schedule_interval="0 12 * * *",  # 每天12点执行一次
        start_date=datetime(2022, 1, 1),  # 从指定日期开始执行
        catchup=False,
        tags=['test'],
) as dag:
    # 这里实例化一个ExterTaskSensor
    t0 = ExternalTaskSensor(
        task_id='monitor_testA',
        external_dag_id='testA',  # 需要等待的外部DAG id
        external_task_id='testA_function2',  # 需要等待的外部Task id,可以为None
        execution_date_fn=DagRun.find(dag_id="testA").pop().execution_date,
        timeout=600,  # 超时时间
        allowed_states=['success'],
        mode='reschedule',
    )

    t1 = BashOperator(
        task_id='testB_function1',
        bash_command='python t1.py',
    )

    t2 = BashOperator(
        task_id='testB_function2',
        bash_command="python t2.py",
    )
    t0 >> t1 >> t2

这里最重要的一句是

代码语言:javascript
代码运行次数:0
运行
复制
execution_date_fn=DagRun.find(dag_id="testA").pop().execution_date 

意思是找到testA的最近一次的执行时间,然后进行监听,如果tastA执行完成了,则 monitor_testA 的任务也就完成了,才会进行后续的操作。这种方式适用于各个任务没有自己的schedule_interval,都是被别的任务调起的,自己不会主动去运行。

注意上面的testA和testB中是两种Dag的依赖方式,真正使用的时候选择一个使用即可,我为了方便,两种方式放在一起做示例。

总结:

这里个人实践下来,推荐使用TriggerDagRunOperator,如果执行频率不一样,使用ExternalTaskSensor的坑会很多。

那么如果有多个依赖的父任务,那么可以根据经验,在执行时间长的那个任务中使用TriggerDagRunOperator通知后续的子任务进行,但是这个并不是100%的安全,可以在任务执行的时候添加相关的数据验证操作。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 马拉松程序员 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档