首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >airflow—给DAG实例传递参数(4)

airflow—给DAG实例传递参数(4)

原创
作者头像
刘远
修改2018-01-12 19:50:46
13.6K1
修改2018-01-12 19:50:46
举报
文章被收录于专栏:刘远的专栏刘远的专栏

我们需要在创建dag实例时传递参数,每个任务都可以从任务实例中获取需要的参数。

创建一个DAG实例

$ airflow trigger_dag -h
[2017-04-14 18:47:28,576] {__init__.py:57} INFO - Using executor CeleryExecutor
usage: airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF]
                           [-e EXEC_DATE]
                           dag_id

positional arguments:
  dag_id                The id of the dag

optional arguments:
  -h, --help            show this help message and exit
  -sd SUBDIR, --subdir SUBDIR
                        File location or directory from which to look for the
                        dag
  -r RUN_ID, --run_id RUN_ID
                        Helps to identify this run
  -c CONF, --conf CONF  JSON string that gets pickled into the DagRun's conf
                        attribute
  -e EXEC_DATE, --exec_date EXEC_DATE
                        The execution date of the DAG

我们把json格式的字符串参数 '{"foo":"bar"}' 传递给DAG实例,如下

airflow trigger_dag example_passing_params_via_test_command -c '{"foo":"bar"}'

任务获取实例参数

def my_py_command(ds, **kwargs):
    logging.info(kwargs)
    logging.info(kwargs.get('dag_run').conf.get('foo'))
    # Print out the "foo" param passed in via
    # `airflow test example_passing_params_via_test_command run_this <date>
    # -tp '{"foo":"bar"}'`
    if kwargs["test_mode"]:
        print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
               = {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
        logging.info(" 'foo' was passed in via test={} command : kwargs[params][foo] \
               = {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
    # Print out the value of "miff", passed in below via the Python Operator
    print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    logging.info(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    return 1

my_templated_command = """
    echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
    echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
"""

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=my_py_command,
    params={"miff":"agg"},
    dag=dag)
    

包含logging的代码部分就是获取参数的地方

源码详解

每个DAG 实例都有一个上下文的概念,以context参数的形式会透传给所有的任务,以及所有任务的回调函数。

context的内容如下:

{
    u'next_execution_date': None,
    u'dag_run': <DagRun example_passing_params_via_test_command @ 2017-04-14 18:28:07: manual__2017-04-14T18:28:07, externally triggered: True>,
    u'tomorrow_ds_nodash': u'20170415',
    u'run_id': 'manual__2017-04-14T18:28:07',
    u'dag': <DAG: example_passing_params_via_test_command>,
    u'prev_execution_date': None,
    u'conf': <module 'airflow.configuration' from '/usr/local/lib/python2.7/site-packages/airflow-1.8.0-py2.7.egg/airflow/configuration.pyc'>,
    u'tables': None,
    u'task_instance_key_str': u'example_passing_params_via_test_command__run_this__20170414',
    u'END_DATE': '2017-04-14',
    u'execution_date': datetime.datetime(2017, 4, 14, 18, 28, 7),
    u'ts': '2017-04-14T18:28:07',
    u'macros': <module 'airflow.macros' from '/usr/local/lib/python2.7/site-packages/airflow-1.8.0-py2.7.egg/airflow/macros/__init__.pyc'>,
    u'params': {'miff': 'agg'},
    u'ti': <TaskInstance: example_passing_params_via_test_command.run_this 2017-04-14 18:28:07 [running]>,
    u'var': {u'json': None, u'value': None},
    u'ds_nodash': u'20170414',
    u'test_mode': False,
    u'end_date': '2017-04-14',
    'templates_dict': None,
    u'task': <Task(PythonOperator): run_this>,
    u'task_instance': <TaskInstance: example_passing_params_via_test_command.run_this 2017-04-14 18:28:07 [running]>,
    u'yesterday_ds_nodash': u'20170413',
    u'latest_date': '2017-04-14',
    u'yesterday_ds': '2017-04-13',
    u'ts_nodash': u'20170414T182807',
    u'tomorrow_ds': '2017-04-15'
}

可以看到上下文中包含了dag_run的值

实例参数使用pickle序列化存储在dag_run表中

字段类型如下

conf = Column(PickleType)

在执行PythonOperator时,会将上下文context参数,传递给回调函数中的self.op_kwargs

class PythonOperator(BaseOperator):
    template_fields = ('templates_dict',)
    template_ext = tuple()
    ui_color = '#ffefeb'

    @apply_defaults
    def __init__(
            self,
            python_callable,
            op_args=None,
            op_kwargs=None,
            provide_context=False,
            templates_dict=None,
            templates_exts=None,
            *args, **kwargs):
        super(PythonOperator, self).__init__(*args, **kwargs)
        self.python_callable = python_callable
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}
        self.provide_context = provide_context
        self.templates_dict = templates_dict
        if templates_exts:
            self.template_ext = templates_exts

    def execute(self, context):
        if self.provide_context:
            context.update(self.op_kwargs)
            context['templates_dict'] = self.templates_dict
            self.op_kwargs = context

        return_value = self.python_callable(*self.op_args, **self.op_kwargs)
        logging.info("Done. Returned value was: " + str(return_value))
        return return_value

注意execute函数的context参数,当self.provide_context为True时,可以对上下文参数进行扩展
并将扩展后的self.op_kwargs传递给执行回调函数

在执行Operator时,就可以从上下文实例中获取DagRun实例

kwargs.get('dag_run')

再从DagRun实例中获取conf参数,值为json对象类型

dag_run_conf = kwargs.get('dag_run').conf

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建一个DAG实例
  • 任务获取实例参数
  • 源码详解
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档