首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Airflow -如何将xcom变量传递给Python函数

Airflow -如何将xcom变量传递给Python函数
EN

Stack Overflow用户
提问于 2017-09-05 23:59:26
回答 4查看 91.5K关注 0票数 51

我需要引用一个由BashOperator返回的变量。在我的task_archive_s3_file中,我需要从get_s3_file获取文件名。该任务只是将{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}打印为字符串,而不是值。

如果我使用bash_command,则可以正确打印该值。

代码语言:javascript
复制
get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-09-06 02:22:56

{{ ti.xcom_pull(...) }}这样的模板只能在支持模板的参数中使用,否则它们不会在执行前呈现。请参见PythonOperatorBashOperatortemplate_fieldstemplate_ext属性。

因此,您可以使用templates_dict将模板传递给python操作符:

代码语言:javascript
复制
def func_archive_s3_file(**context):
    archive(context['templates_dict']['s3_path_filename'])

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,  # must pass this because templates_dict gets passed via context
    templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })

但是,在获取XCom值的情况下,另一种替代方法是使用通过上下文提供的TaskInstance对象:

代码语言:javascript
复制
def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,
票数 64
EN

Stack Overflow用户

发布于 2018-01-19 12:07:42

使用相同的代码并修改参数,如Startdate等。

代码语言:javascript
复制
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

DAG = DAG(
  dag_id='simple_xcom',
  default_args=args,
#  start_date=datetime(2019, 04, 21),
  schedule_interval="@daily",
  #schedule_interval=timedelta(1)
)

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

pull_task = PythonOperator(`enter code here`
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

如果你想知道context['task_instance']kwargs['ti']是从哪里来的,你可以参考Airflow macro documentation

票数 19
EN

Stack Overflow用户

发布于 2022-01-02 03:28:34

在AirFlow2.0(2020年12月发布)中,TaskFlow API使通过XComs变得更容易。使用此接口,您可以简单地从带有@task注释的函数返回值,这些值将在后台作为XComs传递。本教程中的示例:

代码语言:javascript
复制
    @task()
    def extract():
        ...
        return order_data_dict
    
    @task()
    def transform(order_data_dict: dict):
        ...
        return total_order_value

    order_data = extract()
    order_summary = transform(order_data)

在本例中,order_data的类型为XComArg。它存储由extract任务返回的字典。当Python任务运行时,order_data被解包,并且该任务接收存储的普通Python对象。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46059161

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档