我正在使用Apache Airflow创建一个ETL管道,并试图创建通用的自定义操作符。操作符似乎没有问题,但是它们没有被导入到我的DAG python文件中。
这是我的目录结构。
my_project\
.env
Pipfile
Pipfile.lock
.gitignore
.venv\
airflow\
dags\
logs\
plugins\
__init__.py
helpers\
operators\
__init__.py
data_quality.py
load_fact.py
load_dimension.py
stage_redshift这就是__init__.py文件中plugins文件夹下的内容。
from __future__ import division, absolute_import, print_function
from airflow.plugins_manager import AirflowPlugin
import airflow.plugins.operators as operators
import airflow.plugins.helpers as helpers
# Defining the plugin class
class SparkifyPlugin(AirflowPlugin):
name = "sparkify_plugin"
operators = [
operators.StageToRedshiftOperator,
operators.LoadFactOperator,
operators.LoadDimensionOperator,
operators.DataQualityOperator
]
helpers = [
helpers.SqlQueries
]我将这些操作符导入到我的DAG文件中,如下所示
from airflow.operators.sparkify_plugin import (StageToRedshiftOperator,
LoadFactOperator,
LoadDimensionOperator,
DataQualityOperator)我收到一个错误如下所示
ERROR - Failed to import plugin /Users/user_name/Documents/My_Mac/Projects/sparkify_etl_sql_to_sql/airflow/plugins/operators/stage_redshift.py你能帮我理解为什么会发生这种事吗?
发布于 2019-08-21 21:42:28
我想出了如何用Airflow注册我的自定义操作符,而不用使用python文件来使用AirflowPlugin类。
我通过在我的__init__.py文件中plugins目录下声明它们来实现这一点。
我就是这么做的。
我的项目文件夹结构如下
my_project\
.env
Pipfile
Pipfile.lock
.gitignore
.venv\
airflow\
dags\
logs\
plugins\
__init__.py
helpers\
operators\
__init__.py
data_quality.py
load_fact.py
load_dimension.py
stage_redshift我的plugins/__init__.py代码
from airflow.plugins_manager import AirflowPlugin
import operators
import helpers
# Defining the plugin class
class SparkifyPlugin(AirflowPlugin):
name = "sparkify_plugin"
operators = [
operators.StageToRedshiftOperator,
operators.LoadFactOperator,
operators.LoadDimensionOperator,
operators.DataQualityOperator
]
helpers = [
helpers.SqlQueries
]我的plugins/operators/__init__.py代码
from operators.stage_redshift import StageToRedshiftOperator
from operators.load_fact import LoadFactOperator
from operators.load_dimension import LoadDimensionOperator
from operators.data_quality import DataQualityOperator
__all__ = [
'StageToRedshiftOperator',
'LoadFactOperator',
'LoadDimensionOperator',
'DataQualityOperator'
]我在我的数据文件(dags/etl.py)中导入这些自定义操作符,如下所示:
from airflow.operators.spark_plugin import LoadDimensionOperatorspark_plugin是SparkifyPlugin类中的name属性(存储在plugins/__init__.py中)。
气流自动登记这些自定义操作员。
希望它能在未来帮助到其他人。
如果您有一些导入错误,请尝试为每个模块运行python __init__.py,如@绝对值破坏所描述的那样。确保plugins目录中的那个没有抛出错误。
我使用了Pycharm,在__init__.py目录中运行__init__.py文件时,它确实抛出了一些错误。修复plugins目录中的错误并忽略plugins/operators/__init__.py引发的错误解决了我的问题。
发布于 2019-08-21 09:52:22
如果您退房:Writing and importing custom plugins in Airflow
那里的人在插件上也有类似的问题,他们通过在airflow/plugins下添加一个以插件命名的文件来解决这个问题,而不是在__init__.py文件中定义它。
https://stackoverflow.com/questions/57583175
复制相似问题