前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Airflow 使用总结(二)

Airflow 使用总结(二)

作者头像
用户4945346
发布2023-05-06 10:30:30
8070
发布2023-05-06 10:30:30
举报
文章被收录于专栏:pythonista的日常pythonista的日常

一、相同任务不同参数并列执行

最近几周一直在折腾 Airflow ,本周在写一个流水线任务,分为 4 个步骤,第一步会读取数据库 db ,然后是对读取的数据根据某个数据指标进行分组处理,同一个任务接收多组数据参数并列执行任务,并发执行提高任务的执行效率,流程执行如下:

在代码上,任务函数返回一个列表 list ,下一个任务接收参数使用 expand

任务执行顺序没有变化,还是串行执行。

Airflow 的 Web 页面上的体现:

这样的话,一个人任务就对应一个 MAP INDEX。

二、任务之间实现信息共享

一个 Dag 中在可能会包含多个调度任务,这些任务之间可能需要实现信息共享,即怎么把 task A 执行得到的结果传递给 task B,让 task B 可以基于 task A 的结果进行后续操作。XCom 就是给出的答案。

XCom 是 cross-communication 的缩写。它被设计于用来在 Airflow 各个 task 间进行数据共享。XCom 的本质就是把 task 需要传递的信息以 KV 的形式存到 DB 中,而其他 task 则可以从DB中获取。由于XCom是存在DB而不是内存中,这也说明了对于已经执行完的 DAG,如果重跑其中某个 task 的话依然可以获取到同次DAG运行时其他task传递的内容。

XCom 存储的是 KV 形式的数据对,Airflow 包装了 xcom_push 和 xcom_pull 两个方法,可以方便进行存取操作。如果没有特殊的需求,我们只需关注里面的key和value 这两个参数即可。其他参数 Airflow 会根据 task 的上下文自动添加。看个 PythonOperator 的例子更能说明:

代码语言:javascript
复制
def push_data(**context):
 context['ti'].xcom_push(key='test_key', value='test_val')
 
push_data_op = PythonOperator(
   task_id = 'push_data',
   python_callable = push_data,
   provide_context=True,
   dag = dag
)

def pull_data(**context):
 test_data = context['ti'].xcom_pull(key='test_key')
 
pull_data_op = PythonOperator(
   task_id = 'pull_data',
   python_callable = pull_data,
   provide_context=True,
   dag = dag
)

push_data_op >> pull_data_op

上面的代码就在 push_data和 pull_data 两个任务中传递了key='test_key', value='test_val'这一条数据。更方便的是,这里的value不仅限于str类型,这就提供了更大的自由度。注意,在opreator中必须要有provide_context=True,才能在operator内部通过context['ti'](获得当前 task 的 TaskInstance ,进行XCom push/pull的相关操作。

注意:

如果 Airflow 部署在 k8s 上,就建议不要使用 xcom ,在 K8s 中运行自定义 XCom 后端会给 Airflow 部署带来更多的复杂性。可以把任务输出的结果保存到数据库 DB 中,本质上和使用 xcom 是一样的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-02-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 pythonista的日常 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档