首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在气流中,如何使用上下文将参数传递给on_success_callback函数处理程序?

在气流中,如何使用上下文将参数传递给on_success_callback函数处理程序?
EN

Stack Overflow用户
提问于 2020-10-27 11:17:43
回答 3查看 9.2K关注 0票数 1

在气流中,如何使用上下文将参数传递给on_success_callback函数处理程序?

这是我的测试代码:

代码语言:javascript
运行
复制
import airflow
from airflow import DAG
from airflow.operators import MSTeamsWebhookOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from transaction_analytics import helpers
from airflow.utils.helpers import chain

# Parameters & variables
schedule_interval        = "0 20 * * *"

def _task_success_callback(context):
    dagid         = context["task_instance"].dag_id
    duration      = context["task_instance"].duration
    executiondate = context["execution_date"]
    logurl        = context["task_instance"].log_url.replace("localhost", "agbqhsbldd017v.agb.rbxd.ds")# workaround until we config airflow
    pp1           = context["params"].param1 
    #pp1          = "{{ params.param1 }}"
    ms_teams_op = MSTeamsWebhookOperator(
        task_id="success_notification",
        http_conn_id="msteams_airflow",
        message="DAG {ppram1} `{dag}`  finished successfully!".format(dag=context["task_instance"].dag_id, ppram1=pp1),
        subtitle="Execution Date = {p1}, Duration = {p2}".format(p1=executiondate,p2=duration),
        button_text = "View log",
        button_url = "{log}".format(log=logurl),
        theme_color="00FF00"#,
        #proxy= "http://10.72.128.202:3128"
    )
    ms_teams_op.execute(context)

    main_dag = DAG('test_foley',
            schedule_interval=schedule_interval,
            description='Test foley',
            start_date=datetime(2020, 4, 19),
            default_args=None,
            max_active_runs=2,
            default_view='graph',     # Default view graph
            #orientation='TB', # Top-Bottom graph
            on_success_callback=_task_success_callback,
            #on_failure_callback=outer_task_failure_callback,
            catchup=False, # Do not catchup, run only latest
            params={
              "param1": "value1",
              "param2": "value2"
            }
           )

################################### START ######################################
dag_chain = []

start = DummyOperator(task_id='start', retries = 3, dag=main_dag)
dag_chain.append(start)

step1 = BashOperator(
    task_id='step1',
    bash_command='pwd',
    dag=main_dag,
)
dag_chain.append(step1)

step2 = BashOperator(
    task_id='step2',
    bash_command='exit 0',
    dag=main_dag,
)
dag_chain.append(step2)


end = DummyOperator(task_id='end', dag=main_dag)
dag_chain.append(end)

chain(*dag_chain)

我有一个事件处理函数_task_success_callback来处理成功。在DAG中,我有捕获该事件的on_success_callback=_task_success_callback

而且效果很好..。但是现在我需要将一些参数传递到_task_success_callback.中什么是最好的方法?

当该函数接收上下文时,我尝试在DAG中创建参数,如您所见:

代码语言:javascript
运行
复制
        params={
          "param1": "value1",
          "param2": "value2"
        }

但似乎我不能接近他们?

我的问题是:

  1. 我在访问params上做错了什么?
  2. 有更好的方法传递参数吗?

注:我看到类似的问题How to pass parameters to Airflow on_success_callback and on_failure_callback有一个答案.而且起作用了。但是我想要的是使用上下文来传递参数.

EN

回答 3

Stack Overflow用户

发布于 2020-10-27 17:24:54

回想一下,气流过程文件只是Python,只要您在解析过程中不引入太多的开销(因为Airflow经常解析文件,而且这种开销可以加起来),那么您就可以使用Python所能做的一切。特别是对于您的情况,我建议为回调返回一个嵌套函数(闭包):

将其放入一个与气流过程相邻的文件中,比如on_callbacks.py

代码语言:javascript
运行
复制
def success_ms_teams(param_1, param_2):

    def callback_func(context):
        print(f"param_1: {param_1}")
        print(f"param_2: {param_2}")
        # ... trimmed for brevity ...#
        ms_teams_op.execute(context)

    return callback_func

然后,在您的过程中,您可以这样做:

代码语言:javascript
运行
复制
from airflow import models

from on_callbacks import success_ms_teams

with models.DAG(
    ...
    on_success_callback=success_ms_teams(
        "value1", # These values become the
        "value2", # `param_1` and `param_2` 
    )
) as dag:
    ...
票数 4
EN

Stack Overflow用户

发布于 2021-09-03 17:25:30

您可以创建一个任务,其唯一目的是通过xcoms推动配置设置。您可以通过context提取配置,因为task_instance对象包含在context中。

代码语言:javascript
运行
复制
def push_configuration(ti, params):
    ti.xcom_push(key='conn_id', value=params)

def _task_success_callback(context):
    ti = context.get('ti') 
    params = ti.xcom_pull(key='params', task_ids='Settings')
    ...


step0 = PythonOperator(
        task_id='Settings',
        python_callable=push_configuration,
        op_kwargs={'params': params})

step1 = BashOperator(
        task_id='step1',
        bash_command='pwd',
        on_success_callback=_task_success_callback)
票数 1
EN

Stack Overflow用户

发布于 2022-03-09 09:20:29

我使用部分调用具有不同连接的回退函数:

代码语言:javascript
运行
复制
from functools import *

# pass extra params here
def my_conn_callback(context, slack_conn_id, slack_channel):
  print(context)
  print(slack_conn_id)
  print(slack_channel)

# this is for the on_failure_callback
task_fail_slack_alert_callback_my_conn = partial(my_conn_callback, 
slack_conn_id = "slack-conn", slack_channel = "#slack_channel")

# this is how airflow calls it internally:
print(task_fail_slack_alert_callback_my_conn('context'))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64553292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档