我创建了一个自定义的气流操作符,基本上修改了一些与run_id相关的TriggerDagRunOperator代码,并将其命名为CustomTriggerDagRunOperator。
这个新的接线员运转良好。当我将运算符类放入我的DAGs代码中时,我的dag运行良好,修改也会按预期执行。但是当我为这个操作符创建一个单独的python文件时,比如说,my_custom_operator.py并将这个文件放在与DAG相同的文件夹中。此后,在DAG中添加了从my_custom_operator导入CustomTriggerDagRunOperator的导入语句。气流UI不产生任何DAG错误。但是,当我试图运行DAG时,它不工作,也不显示任何日志,甚至与此操作符无关的任务也无法执行。这是令人困惑的,因为我刚刚将与操作符相关的代码移到了另一个文件中,以便可以在我的所有DAG中使用自定义操作符。需要一些建议。
气流版本: 2.1.3使用天文仪器,托管在Kubernetes上
发布于 2022-11-13 22:08:23
为了从模块导入类/方法,需要将模块包添加到python,在本例中,DagFileProcessor将能够在处理dag脚本时导入类/方法。
DAGS_FOLDER/
  dag.py
  my_operators/
    operator1.py在调度程序和所有工作人员中,您需要将python路径设置为PYTHONPATH=$PYTHONPATH:/path/to/DAGS_FOLDER,而在dag脚本中,您需要从my_operators包而不是从.导入
from my_operators.operator1 import CustomTriggerDagRunOperator对于您的开发,您可以选择DAGS_FOLDER作为项目的源文件夹,这类似于将它添加到python。
https://stackoverflow.com/questions/74423014
复制相似问题