我试图了解如何通过airflow
xcom
功能传递值。我试图构建的具体用途是编写一个文件,然后移动它,然后运行另一个命令。我的想法是将文件名从一个操作符传递到下一个操作符。
以下是我所拥有的:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
DAG = DAG(
dag_id='xcom_test_dag',
start_date=dt.datetime.now(),
schedule_interval='@once'
)
def push_function(**context):
file_name = 'test_file_{date}'.format(date=dt.datetime.now())
return context['task_instance'].xcom_push(key='filename', value=file_name)
def pull_function(**context):
dir(context['task_instance'].xcom_pull())
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
如果我想在pull_task中引用文件名,这样就可以读取该文件--我应该如何调用该文件?尝试访问context['task_instance']
不包含值。此外-尝试并引用这样的文件名从一个任务到另一个任务/操作符到操作符是最好的实践吗?
发布于 2020-01-19 15:37:48
从XCOM提取数据时,需要提供推送数据的任务的任务ID。在您的示例中,push任务的task_id是push_task
,因此您需要执行以下操作:
value = context['task_instance'].xcom_pull(task_ids='push_task')
然而,从气流文件中可以注意到:
默认情况下,xcom_pull()筛选那些在从execute函数返回时自动分配给XComs的键(与手动推送的XComs相反)。
如果要使用特定的键手动将数据推送到XCOM,则在调用xcom_pull
时可能需要包含该键。在您的示例中,您在推送任务中按下一个名为filename
的键,因此您可能需要在拉任务中执行如下操作:
value = context['task_instance'].xcom_pull(task_ids='push_task', key='filename')
这些信息在气流文档中有更详细的介绍:https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#concepts-xcom。
至于您关于“最佳实践”的问题--对于气流任务/操作员之间的通信,XCOM是最好的方法。但是,如果希望跨多个操作符从磁盘读取文件,则需要确保所有工作人员都可以访问文件的存储位置。如果这是不可能的,另一种选择是让push任务存储该文件(例如,在AWS S3中)并将S3 URL推送到XCOM。然后,拉任务可以从XCOM读取S3 URL,并从S3下载该文件。
https://stackoverflow.com/questions/59811587
复制