首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Airflow:如何将xcom从父dag传递到子dag

Airflow是一个开源的任务调度和工作流管理平台,用于构建、调度和监控复杂的数据管道。它使用Python编写,提供了丰富的功能和灵活的扩展性。

在Airflow中,xcom是一种用于在任务之间传递数据的机制。xcom可以在同一个DAG中的不同任务之间传递数据,也可以在不同DAG之间传递数据。

要将xcom从父DAG传递到子DAG,可以使用Airflow提供的XComPushOperator和XComPullOperator。

  1. XComPushOperator:在父DAG中使用XComPushOperator将数据推送到xcom中。例如:
代码语言:txt
复制
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

def push_data(**context):
    data = "Hello, World!"
    context['ti'].xcom_push(key='my_data', value=data)

with DAG('parent_dag', schedule_interval=None) as dag:
    push_task = PythonOperator(
        task_id='push_task',
        python_callable=push_data
    )

2. XComPullOperator:在子DAG中使用XComPullOperator从xcom中拉取数据。例如:

```python
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

def pull_data(**context):
    data = context['ti'].xcom_pull(key='my_data')
    print(data)

with DAG('child_dag', schedule_interval=None) as dag:
    pull_task = PythonOperator(
        task_id='pull_task',
        python_callable=pull_data
    )

在子DAG中,可以通过context['ti'].xcom_pull(key='my_data')来获取父DAG中推送的数据。

推荐的腾讯云相关产品:腾讯云容器服务(Tencent Kubernetes Engine,TKE),它是一种高度可扩展的容器管理服务,可帮助您轻松部署、运行和管理容器化应用程序。TKE提供了强大的容器编排和调度能力,适用于部署和管理Airflow等工作负载。

腾讯云产品介绍链接地址:https://cloud.tencent.com/product/tke
页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Airflow 使用总结(二)

二、任务之间实现信息共享 一个 Dag 中在可能会包含多个调度任务,这些任务之间可能需要实现信息共享,即怎么把 task A 执行得到的结果传递给 task B,让 task B 可以基于 task A...XCom 的本质就是把 task 需要传递的信息以 KV 的形式存到 DB 中,而其他 task 则可以从DB中获取。...由于XCom是存在DB而不是内存中,这也说明了对于已经执行完的 DAG,如果重跑其中某个 task 的话依然可以获取到同次DAG运行时其他task传递的内容。...XCom 存储的是 KV 形式的数据对,Airflow 包装了 xcom_push 和 xcom_pull 两个方法,可以方便进行存取操作。...注意: 如果 Airflow 部署在 k8s 上,就建议不要使用 xcom ,在 K8s 中运行自定义 XCom 后端会给 Airflow 部署带来更多的复杂性。

89920

你不可不知的任务调度神器-AirFlow

Airflow 使用 DAG (有向无环图) 来定义工作流,配置作业依赖关系非常方便,从管理方便和使用简单角度来讲,AirFlow远超过其他的任务调度工具。...Airflow 是免费的,我们可以将一些常做的巡检任务,定时脚本(如 crontab ),ETL处理,监控等任务放在 AirFlow 上集中管理,甚至都不用再写监控脚本,作业出错会自动发送日志指定人员邮箱...执行器:Executor 是一个消息队列进程,它被绑定调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。...并在 home 页开启 example dag AirFlow默认使用sqlite作为数据库,直接执行数据库初始化命令后,会在环境变量路径下新建一个数据库文件airflow.db。...a DAG from airflow import DAG # Operators; we need this to operate!

3.5K21

Airflow 实践笔记-从入门精通二

除了公有变量,如果operator之间要互相传递参数或者中间过程数据,例如一个operator要依赖另一个operator的输出结果进行执行,有以下几个方式 使用XCom,有点像dict对象,存储在airflow...Airflow2中允许自定义XCom,以数据库的形式存储,从而支持较大的数据。 # 从该实例中的xcom里面取 前面任务train_model设置的键值为model_id的值。...但是需要注意的是,这种传参本质上还是通过xcom来实现传递的,必须是可序列号的对象,所以参数必须是python最基本的数据类型,像dataframe就不能作为参数来传递。...,例如判断输入文件是否到位(可以设置一个时间窗口内,例如某个时间点之前检查文件是否到位),但是sensor很耗费计算资源(设置mode为reschedule可以减少开销,默认是poke),DAG会设置..._s3_key, ) 关于dag和operator的相关特性介绍到此,后续会讲述Airflow的集群搭建(从入门精通三),Dolphinscheduler , Dataworks(阿里云)的调度工具后续也会介绍

2.6K20

【翻译】Airflow最佳实践

原文:https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html 创建DAG有两个步骤: 用Python实现一个...如果可能,我们应该XCom来在不同的任务之间共享小数据,而如果如果数据量比较大,则应该使用分布式文件系统,如S3或者HDFS等,这时可以使用XCom来共享其在S3或者HDFS中的文件地址。...例如,如果我们有一个推送数据S3的任务,于是我们能够在下一个任务中完成检查。...2.4 暂存(staging)环境变量 如果可能,在部署生产环境运行起来之前,我们应该保持一个暂存环境去测试完整的DAG。需要确保我们的DAG是已经参数化了的,而不是在DAG中硬编码。...然而不管是从数据库读取数据还是写数据数据库,都会产生额外的时间消耗。因此,为了加速测试的执行,不要将它们保存到数据库是有效的实践。

3.1K10

闲聊Airflow 2.0

等了半年后,注意 Airflow 已经发布版本 2.1.1 了,而且Airflow 1.0+的版本也即将不再维护,自己也做了小规模测试,基本上可以确定 Airflow2.0 可以作为生产环境下的版本了...目前为止 Airflow 2.0.0 2.1.1 的版本更新没有什么大的变化,只是一些小的配置文件和行为逻辑的更新,比如Dummy trigger在2.1.1版本过时了、DAG concurrency...引入编写 dag(有向无环图)的新方法:TaskFlow API 新的方法对依赖关系的处理更清晰,XCom 也更易于使用。...Airflow 2.0 Scheduler 通过使用来自数据库的序列化后 DAG 进行任务调度和调用,扩展了 DAG 序列化的使用。这减少了重复解析 DAG 文件以进行调度所需的时间。...这意味着,如果您想使用与AWS相关的operators,而不是与GCP和Kubernetes相关的operators,则只能使用Amazon提供程序软件包安装Airflow: pip install

2.6K30

Airflow2.2.3 + Celery + MYSQL 8构建一个健壮的分布式调度集群

部署完成之后,就可以通过flower查看broker的状态: 3持久化配置文件 大多情况下,使用airflow多worker节点的集群,我们就需要持久化airflow的配置文件,并且将airflow同步所有的节点上...; 前期使用的时候,我们需要将docker-compose文件中的一些环境变量的值写入airflow.cfg文件中,例如以下信息: [core] dags_folder = /opt/airflow/...50 task_runner = StandardTaskRunner default_impersonation = security = unit_test_mode = False enable_xcom_pickling...xcom_backend = airflow.models.xcom.BaseXCom lazy_load_plugins = True lazy_discover_providers = True...= /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log task_log_reader = task extra_logger_names

1.6K10

在Kubernetes上运行Airflow两年后的收获

支持 DAG 的多仓库方法 DAG 可以在各自团队拥有的不同仓库中开发,并最终出现在同一个 Airflow 实例中。当然,这是不需要将 DAG 嵌入 Airflow 镜像中的。...通过这样做,我们可以使用原生 Airflow 角色来强制访问控制,并且每个 DAG 必须通过最低的治理检查清单才能提交。 但是,如何将 DAG 同步 Airflow 中呢?...鉴于我们的限制,一个解决方法是使用 nodeSelector 将所有 Airflow Pod 调度同一个节点上。...然而,由于 DAG 在调度器中定期解析,我们观察当使用这种方法时,CPU 和内存使用量增加,调度器循环时间变长。...这可能包括诸如 job、dag_run、task_instance、log、xcom、sla_miss、dags、task_reschedule、task_fail 等表。

28810

Airflow 实践笔记-从入门精通一

airflow 2.0以后,因为task的函数跟python常规函数的写法一样,operator之间可以传递参数,但本质上还是使用XComs,只是不需要在语法上具体写XCom的相关代码。...AIRFLOW_HOME 是 Airflow 寻找 DAG 和插件的基准目录。...当数据工程师开发完python脚本后,需要以DAG模板的方式来定义任务流,然后把dag文件放到AIRFLOW_HOME下的DAG目录,就可以加载到airflow里开始运行该任务。...同时需要把本地yaml所在文件夹加入允许file sharing的权限,否则后续创建容器时可能会有报错信息“Cannot create container for service airflow-init...AIRFLOW__CORE__DAGS_FOLDER 是放置DAG文件的地方,airflow会定期扫描这个文件夹下的dag文件,加载到系统里。

4.9K11

Airflow秃头两天填坑过程:任务假死问题

由于没有Airflow一段时间了,只能硬着头皮一边重新熟悉Airflow,一边查找定位问题,一直到很晚,不过基本上没有摸到问题的关键所在,只是大概弄清楚症状: Airflow中的Dag任务手动可以启动...网上有文章提到这可能是Airflow中的task_instance表的state字段缺少索引, 导致查询很慢导致的, 这就涉及Airflow本身的问题了。...: Comment: *************************** 21. row *************************** Name: xcom...涉及的原始表有三个: task_instance, dag_run, dag。...这里, 我大概有了两个解决方案: 给dag_id和execution_date添加联合索引; 清理掉一些历史数据(但是这个有风险, 容易产生不可逆的影响) 这里上午的时间也耗完了。

2.5K20

Apache AirFlow 入门

Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。...# DAG 对象; 我们将需要它来实例化一个 DAG from airflow import DAG # Operators 我们需要利用这个对象去执行流程 from airflow.operators.bash...import BashOperator 默认参数 我们即将创建一个 DAG 和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数,或者我们可以定义一个默认参数的字典,这样我们可以在创建任务时使用它...这里我们传递一个定义为dag_id的字符串,把它用作 DAG 的唯一标识符。我们还传递我们刚刚定义的默认参数字典,同时也为 DAG 定义schedule_interval,设置调度间隔为每天一次。...任务参数的优先规则如下: 明确传递参数 default_args字典中存在的值 operator 的默认值(如果存在) 任务必须包含或继承参数task_id和owner,否则 Airflow 将出现异常

2.5K00

Airflow 使用简单总结

下图是展示一些 dags 历史执行情况,绿色表示成功,红色表示失败,任务执行可以在Web UI 上点击运行dag,也可以通过调用 Airflow 的 API 接口运行指定的 dag 。...在页面上还能看到某个 dag 的任务步骤依赖关系,下图是用的最简单的串行 下面展示的是每个步骤的历史执行情况 在代码中按照规定好的语法就能设置每个 dag任务以及每个子任务之间的依赖关系...(绿框) 对于开发人员来说,使用 Airflow 就是编写 dags 文件 编写 DAG 的流程: 先用装饰器@dag 定义一个 DAGdag_id就是网页上DAG的名称,这个必须是唯一的,不允许和其他的...dag重复。...get_current_context() 是 Airflow 自带的函数,获取上下文信息,包含给DAG传递的参数,通过 parmas 这个 key 获取。

82520

没看过这篇文章,别说你会用Airflow

Webserver:Airflow Webserver 也是一个独立的进程,提供 web 端服务, 定时生成进程扫描对应的 DAG 信息,以 UI 的方式展示 DAG 或者 task 的信息。...由于 Airflow DAG 是面向过程的执行,并且 task 没办法继承或者使用 return 传递变量,但是代码组织结构上还是可以面向对象结构组织,以达到最大化代码复用的目的。...所以这个问题不能够通过简单的 Airflow 配置来改变。需要修改一下申请资源 task 和回收资源 task 来传递一些信息。...安全与权限管理 Airflow 是一个公用组件,各个团队都可以部署自己的 pipeline 公共的 Airflow。这种情况下,权限管理就尤为必要了。...所有的 worker&master 都 mount 相同 efs。经过实践,code 同步和部署的问题都能迎刃而解。

1.5K20
领券