首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么我的自定义操作员没有被导入到我的DAG (气流)?

为什么我的自定义操作员没有被导入到我的DAG (气流)?
EN

Stack Overflow用户
提问于 2019-08-21 01:00:17
回答 2查看 3.5K关注 0票数 3

我正在使用Apache Airflow创建一个ETL管道,并试图创建通用的自定义操作符。操作符似乎没有问题,但是它们没有被导入到我的DAG python文件中。

这是我的目录结构。

代码语言:javascript
复制
my_project\
  .env
  Pipfile
  Pipfile.lock
  .gitignore
  .venv\
  airflow\
    dags\
    logs\
    plugins\
      __init__.py
      helpers\
      operators\
        __init__.py
        data_quality.py
        load_fact.py
        load_dimension.py
        stage_redshift

这就是__init__.py文件中plugins文件夹下的内容。

代码语言:javascript
复制
from __future__ import division, absolute_import, print_function

from airflow.plugins_manager import AirflowPlugin

import airflow.plugins.operators as operators
import airflow.plugins.helpers as helpers

# Defining the plugin class
class SparkifyPlugin(AirflowPlugin):
    name = "sparkify_plugin"
    operators = [
        operators.StageToRedshiftOperator,
        operators.LoadFactOperator,
        operators.LoadDimensionOperator,
        operators.DataQualityOperator
    ]
    helpers = [
        helpers.SqlQueries
    ]

我将这些操作符导入到我的DAG文件中,如下所示

代码语言:javascript
复制
from airflow.operators.sparkify_plugin import (StageToRedshiftOperator,
                               LoadFactOperator,
                               LoadDimensionOperator,
                               DataQualityOperator)

我收到一个错误如下所示

代码语言:javascript
复制
ERROR - Failed to import plugin /Users/user_name/Documents/My_Mac/Projects/sparkify_etl_sql_to_sql/airflow/plugins/operators/stage_redshift.py

你能帮我理解为什么会发生这种事吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-08-21 21:42:28

我想出了如何用Airflow注册我的自定义操作符,而不用使用python文件来使用AirflowPlugin类。

我通过在我的__init__.py文件中plugins目录下声明它们来实现这一点。

我就是这么做的。

我的项目文件夹结构如下

代码语言:javascript
复制
my_project\
  .env
  Pipfile
  Pipfile.lock
  .gitignore
  .venv\
  airflow\
    dags\
    logs\
    plugins\
      __init__.py
      helpers\
      operators\
        __init__.py
        data_quality.py
        load_fact.py
        load_dimension.py
        stage_redshift

我的plugins/__init__.py代码

代码语言:javascript
复制
from airflow.plugins_manager import AirflowPlugin

import operators
import helpers

# Defining the plugin class
class SparkifyPlugin(AirflowPlugin):
    name = "sparkify_plugin"
    operators = [
        operators.StageToRedshiftOperator,
        operators.LoadFactOperator,
        operators.LoadDimensionOperator,
        operators.DataQualityOperator
    ]
    helpers = [
        helpers.SqlQueries
    ]

我的plugins/operators/__init__.py代码

代码语言:javascript
复制
from operators.stage_redshift import StageToRedshiftOperator
from operators.load_fact import LoadFactOperator
from operators.load_dimension import LoadDimensionOperator
from operators.data_quality import DataQualityOperator

__all__ = [
    'StageToRedshiftOperator',
    'LoadFactOperator',
    'LoadDimensionOperator',
    'DataQualityOperator'
]

我在我的数据文件(dags/etl.py)中导入这些自定义操作符,如下所示:

代码语言:javascript
复制
from airflow.operators.spark_plugin import LoadDimensionOperator

spark_pluginSparkifyPlugin类中的name属性(存储在plugins/__init__.py中)。

气流自动登记这些自定义操作员。

希望它能在未来帮助到其他人。

如果您有一些导入错误,请尝试为每个模块运行python __init__.py,如@绝对值破坏所描述的那样。确保plugins目录中的那个没有抛出错误。

我使用了Pycharm,在__init__.py目录中运行__init__.py文件时,它确实抛出了一些错误。修复plugins目录中的错误并忽略plugins/operators/__init__.py引发的错误解决了我的问题。

票数 4
EN

Stack Overflow用户

发布于 2019-08-21 09:52:22

如果您退房:Writing and importing custom plugins in Airflow

那里的人在插件上也有类似的问题,他们通过在airflow/plugins下添加一个以插件命名的文件来解决这个问题,而不是在__init__.py文件中定义它。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57583175

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档