airflow—给DAG实例传递参数(4)

我们需要在创建dag实例时传递参数,每个任务都可以从任务实例中获取需要的参数。

创建一个DAG实例

$ airflow trigger_dag -h
[2017-04-14 18:47:28,576] {__init__.py:57} INFO - Using executor CeleryExecutor
usage: airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF]
                           [-e EXEC_DATE]
                           dag_id

positional arguments:
  dag_id                The id of the dag

optional arguments:
  -h, --help            show this help message and exit
  -sd SUBDIR, --subdir SUBDIR
                        File location or directory from which to look for the
                        dag
  -r RUN_ID, --run_id RUN_ID
                        Helps to identify this run
  -c CONF, --conf CONF  JSON string that gets pickled into the DagRun's conf
                        attribute
  -e EXEC_DATE, --exec_date EXEC_DATE
                        The execution date of the DAG

我们把json格式的字符串参数 '{"foo":"bar"}' 传递给DAG实例,如下

airflow trigger_dag example_passing_params_via_test_command -c '{"foo":"bar"}'

任务获取实例参数

def my_py_command(ds, **kwargs):
    logging.info(kwargs)
    logging.info(kwargs.get('dag_run').conf.get('foo'))
    # Print out the "foo" param passed in via
    # `airflow test example_passing_params_via_test_command run_this <date>
    # -tp '{"foo":"bar"}'`
    if kwargs["test_mode"]:
        print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
               = {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
        logging.info(" 'foo' was passed in via test={} command : kwargs[params][foo] \
               = {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
    # Print out the value of "miff", passed in below via the Python Operator
    print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    logging.info(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
    return 1

my_templated_command = """
    echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
    echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
"""

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=my_py_command,
    params={"miff":"agg"},
    dag=dag)
    

包含logging的代码部分就是获取参数的地方

源码详解

每个DAG 实例都有一个上下文的概念,以context参数的形式会透传给所有的任务,以及所有任务的回调函数。

context的内容如下:

{
    u'next_execution_date': None,
    u'dag_run': <DagRun example_passing_params_via_test_command @ 2017-04-14 18:28:07: manual__2017-04-14T18:28:07, externally triggered: True>,
    u'tomorrow_ds_nodash': u'20170415',
    u'run_id': 'manual__2017-04-14T18:28:07',
    u'dag': <DAG: example_passing_params_via_test_command>,
    u'prev_execution_date': None,
    u'conf': <module 'airflow.configuration' from '/usr/local/lib/python2.7/site-packages/airflow-1.8.0-py2.7.egg/airflow/configuration.pyc'>,
    u'tables': None,
    u'task_instance_key_str': u'example_passing_params_via_test_command__run_this__20170414',
    u'END_DATE': '2017-04-14',
    u'execution_date': datetime.datetime(2017, 4, 14, 18, 28, 7),
    u'ts': '2017-04-14T18:28:07',
    u'macros': <module 'airflow.macros' from '/usr/local/lib/python2.7/site-packages/airflow-1.8.0-py2.7.egg/airflow/macros/__init__.pyc'>,
    u'params': {'miff': 'agg'},
    u'ti': <TaskInstance: example_passing_params_via_test_command.run_this 2017-04-14 18:28:07 [running]>,
    u'var': {u'json': None, u'value': None},
    u'ds_nodash': u'20170414',
    u'test_mode': False,
    u'end_date': '2017-04-14',
    'templates_dict': None,
    u'task': <Task(PythonOperator): run_this>,
    u'task_instance': <TaskInstance: example_passing_params_via_test_command.run_this 2017-04-14 18:28:07 [running]>,
    u'yesterday_ds_nodash': u'20170413',
    u'latest_date': '2017-04-14',
    u'yesterday_ds': '2017-04-13',
    u'ts_nodash': u'20170414T182807',
    u'tomorrow_ds': '2017-04-15'
}

可以看到上下文中包含了dag_run的值

实例参数使用pickle序列化存储在dag_run表中

字段类型如下

conf = Column(PickleType)

在执行PythonOperator时,会将上下文context参数,传递给回调函数中的self.op_kwargs

class PythonOperator(BaseOperator):
    template_fields = ('templates_dict',)
    template_ext = tuple()
    ui_color = '#ffefeb'

    @apply_defaults
    def __init__(
            self,
            python_callable,
            op_args=None,
            op_kwargs=None,
            provide_context=False,
            templates_dict=None,
            templates_exts=None,
            *args, **kwargs):
        super(PythonOperator, self).__init__(*args, **kwargs)
        self.python_callable = python_callable
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}
        self.provide_context = provide_context
        self.templates_dict = templates_dict
        if templates_exts:
            self.template_ext = templates_exts

    def execute(self, context):
        if self.provide_context:
            context.update(self.op_kwargs)
            context['templates_dict'] = self.templates_dict
            self.op_kwargs = context

        return_value = self.python_callable(*self.op_args, **self.op_kwargs)
        logging.info("Done. Returned value was: " + str(return_value))
        return return_value

注意execute函数的context参数,当self.provide_context为True时,可以对上下文参数进行扩展
并将扩展后的self.op_kwargs传递给执行回调函数

在执行Operator时,就可以从上下文实例中获取DagRun实例

kwargs.get('dag_run')

再从DagRun实例中获取conf参数,值为json对象类型

dag_run_conf = kwargs.get('dag_run').conf

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏青青天空树

struts返回json数据

  实际上就是在struts中获取response对象的输出流。然后写入你要返回的json数据,本质和用servlet返回json数据是一样的,需要自己导入js...

1326
来自专栏博客园

Core官方DI解析(4)--CallSiteRuntimeResolver

​ CallSiteRuntimeResolver类型是一个创建或获取服务实例的类型,这个类型继承了CallSiteVisitor<TArgument, TRe...

911
来自专栏null的专栏

数据结构和算法——旋转打印链表

1、问题描述 输入参数nnn为正整数,如输入n=5n=5n=5,则按行打印如下的数字: ? 2、问题的理解 这个问题是将数字1…n21…n21\dots n^2...

2993
来自专栏海天一树

2018 TCO Algorithm-Round 1B 600-points题解报告

Consider the set of integers between 1 and n, inclusive, and two positive intege...

853
来自专栏Spark学习技巧

案例说明flink的udf

用户自定义函数是非常重要的一个特征,因为他极大地扩展了查询的表达能力。本文除了介绍这三种udf之外,最后会介绍一个redis作为交互数据源的udf案例。

9962
来自专栏有趣的Python

py编程技巧-1.2- 如何为元组中每个元素命名,提高程序可读性

实际应用场景: 学生信息系统中数据为固定格式,使用元组来存储,操作速度快。但访问时,我们使用索引(index)访问,大量索引降低程序可读性。如何解决该问题?...

3878
来自专栏TungHsu

这或许是对小白最友好的python入门了吧——8,初识for语句

有时候我们想要使用列表中所有元素,但是如果手打又不现实,这时候我们可以用for语句来遍历整个列表,我们先举个例子,还是昨天的列表 ? 现在我们用for语句来遍历...

2916
来自专栏闵开慧

pig操作与注意事项

grunt> A = load 'hdfs://192.168.0.118:9000/user/hadoop/data.txt' as (name:charar...

2693
来自专栏青玉伏案

iOS开发之SQLite--C语言接口规范(三)——Binding Values To Prepared Statements

  在前面的博客中已经介绍了如何连接SQLite数据库,并且简单的查询和遍历结果集。在前面用到了sqlite3_stmt *stmt,也就是预编译后的SQL语句...

1856
来自专栏.NET技术

C#系列之String和StringBuilder

      首先和博园的各位打声招呼,小弟在博园呆了也有一年多了。平常一有时间就会过来看看文章,学习各位的经验,现在养成了一种一天不来博园,心里就不踏实的习惯,...

1104

扫码关注云+社区