首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >气流Xcom未得到解决返回task_instance字符串

气流Xcom未得到解决返回task_instance字符串
EN

Stack Overflow用户
提问于 2022-05-29 08:12:59
回答 1查看 266关注 0票数 0

对于xcom_pull,我面临一个奇怪的问题,它总是返回一个xcom_pull字符串"{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"

我的要求很简单,我使用python运算符推动了xcom,使用xcom_pull,我试图检索该值并将其作为http_conn_id传递给SimpleHttpOperator,但是变量返回一个字符串,而不是解析xcom_pull值。Python Operator能够成功地推动XCom。

代码:

代码语言:javascript
运行
复制
from datetime import datetime

import simplejson as json
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator   
from airflow.operators.python_operator import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from google.auth.transport.requests import Request

default_airflow_args = {
    "owner": "divyaansh",
    "depends_on_past": False,
    "start_date": datetime(2022, 5, 18),        
    "retries": 0,
    "schedule_interval": "@hourly",
}

project_configs = {
    "project_id": "test",
    "conn_id": "google_cloud_storage_default",
    "bucket_name": "test-transfer",
    "folder_name": "processed-test-rdf",
}


def get_config_vals(**kwargs) -> dict:
    """
    Get config vals from airlfow variable and store it as xcoms

    """

    task_instance = kwargs["task_instance"]

    task_instance.xcom_push(key="http_con_id", value="gcp_cloud_function")


def generate_api_token(cf_name: str):
    """
    generate token for api request
    """
    import google.oauth2.id_token    
    
    request = Request()

    target_audience = f"https://us-central1-test-a2h.cloudfunctions.net/{cf_name}"

    return google.oauth2.id_token.fetch_id_token(
        request=request, audience=target_audience
    )


with DAG(
    dag_id="cf_test",
    default_args=default_airflow_args,
    catchup=False,
    render_template_as_native_obj=True,
) as dag:

    start = DummyOperator(task_id="start")

    config_vals = PythonOperator(
        task_id="get_config_val", python_callable=get_config_vals, provide_context=True
    )

    ip_data = json.dumps(
        {
            "bucket_name": project_configs["bucket_name"],
            "file_name": "dummy",
            "target_location": "/valid",
        }
    )

    conn_id = "{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"

    api_token = generate_api_token("new-cp")

    cf_task = SimpleHttpOperator(
        task_id="file_decrypt_and_validate_cf",
        http_conn_id=conn_id,
        method="POST",
        endpoint="new-cp",
        data=json.dumps(
            json.dumps(
                {
                    "bucket_name": "test-transfer",
                    "file_name": [
                        "processed-test-rdf/dummy_20220501.txt",
                        "processed-test-rdf/dummy_20220502.txt",                            
                    ],
                    "target_location": "/valid",
                }
            )
        ),
        headers={
            "Authorization": f"bearer {api_token}",
            "Content-Type": "application/json",                
        },
        do_xcom_push=True,
        log_response=True,
    )

    print("task new-cp", cf_task)   
    

    check_flow = DummyOperator(task_id="check_flow")

    end = DummyOperator(task_id="end")

start >> config_vals >> cf_task >> check_flow >> end

错误信息:

raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `"{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"` isn't defined

我试过几天了,但似乎没有什么效果。有人能帮我指出正确的方向吗。

气流-版本: 2.2.3作曲家-版本: 2.0.11

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-05-29 15:36:44

SimpleHttpOperator中,http_conn_id参数不是模板字段,因此不能与它一起使用Jinja。这意味着不能呈现此参数。因此,当您将"{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"传递给操作符时,您希望它在运行时被前面任务存储在Xcom中的值替换,但实际上,气流将它视为一个常规字符串,这也是异常告诉您的。实际上,它试图使用非常长的字符串的名称搜索连接,但是找不到它,因此它告诉您连接没有定义。

要解决这个问题,您可以创建一个自定义操作符:

代码语言:javascript
运行
复制
class MySimpleHttpOperator(SimpleHttpOperator):
    template_fields = SimpleHttpOperator.template_fields + ("http_conn_id",)

然后,您应该在DAG中用SimpleHttpOperator替换MySimpleHttpOperator

此更改使您在http_conn_id中设置的字符串通过Jinja传递。因此,在您的示例中,字符串将如您所期望的那样被替换为Xcom值。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72421971

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档