首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Airflow如何从代码本身获取每个dag的环境变量

Airflow如何从代码本身获取每个dag的环境变量
EN

Stack Overflow用户
提问于 2019-03-01 05:42:23
回答 1查看 6.6K关注 0票数 4

我从日志中看到以下信息:

2019-02-28 16:33:14,766 {python_operator.py:95}信息-导出以下环境变量:

代码语言:javascript
复制
AIRFLOW_CTX_DAG_ID=email_operator_with_log_attachment_example
AIRFLOW_CTX_EXECUTION_DATE=2019-02-28T21:32:51.357255+00:00
AIRFLOW_CTX_TASK_ID=python_send_email
AIRFLOW_CTX_DAG_RUN_ID=manual__2019-02-28T21:32:51.357255+00:00

如何在代码中获取这些信息?

非常感谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-01 07:00:14

您可以使用os.environ["ENV VAR NAME"] (确保使用import os命令)访问这些变量。例如:

代码语言:javascript
复制
import os
# ... other imports ...

dag = DAG(
    dag_id="demo",
    default_args=default_args,
    schedule_interval="0 0 * * *",
)

def print_env_var():
    print(os.environ["AIRFLOW_CTX_DAG_ID"])

print_context = PythonOperator(
    task_id="print_env",
    python_callable=print_env_var,
    dag=dag,
)

但是,在任务中访问此类变量的常见方法是通过在运算符中设置provide_context=True来提供任务上下文。

例如:

代码语言:javascript
复制
dag = DAG(
    dag_id="demo",
    default_args=default_args,
    schedule_interval="0 0 * * *",
)

def print_context(**context):
    print(context)

print_context = PythonOperator(
    task_id="print_context",
    python_callable=print_context,
    provide_context=True,  # <====
    dag=dag,
)

context变量将包含许多变量,其中包含有关任务上下文的信息,包括问题中的变量:

代码语言:javascript
复制
# {
# 'END_DATE': '2019-01-01',
# 'conf': <module 'airflow.configuration' from '/opt/conda/lib/python3.6/site-packages/airflow/configuration.py'>,
# 'dag': <DAG: context_demo>,
# 'dag_run': None,
# 'ds': '2019-01-01',
# 'ds_nodash': '20190101',
# 'end_date': '2019-01-01',
# 'execution_date': <Pendulum [2019-01-01T00:00:00+00:00]>,
# 'inlets': [],
# 'latest_date': '2019-01-01',
# 'macros': <module 'airflow.macros' from '/opt/conda/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
# 'next_ds': '2019-01-02',
# 'next_ds_nodash': '20190102',
# 'next_execution_date': datetime.datetime(2019, 1, 2, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
# 'outlets': [],
# 'params': {},
# 'prev_ds': '2018-12-31',
# 'prev_ds_nodash': '20181231',
# 'prev_execution_date': datetime.datetime(2018, 12, 31, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
# 'run_id': None,
# 'tables': None,
# 'task': <Task(PythonOperator): print_exec_date>,
# 'task_instance': <TaskInstance: context_demo.print_exec_date 2019-01-01T00:00:00+00:00 [None]>,
# 'task_instance_key_str': 'context_demo__print_exec_date__20190101',
# 'templates_dict': None,
# 'test_mode': True,
# 'ti': <TaskInstance: context_demo.print_exec_date 2019-01-01T00:00:00+00:00 [None]>,
# 'tomorrow_ds': '2019-01-02',
# 'tomorrow_ds_nodash': '20190102',
# 'ts': '2019-01-01T00:00:00+00:00',
# 'ts_nodash': '20190101T000000',
# 'ts_nodash_with_tz': '20190101T000000+0000',
# 'var': {'json': None, 'value': None},
# 'yesterday_ds': '2018-12-31',
# 'yesterday_ds_nodash': '20181231'
# }

我将在this blog post中更详细地解释如何处理任务上下文(参见"3.将上下文传递给任务“)。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54934732

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档