我从airflow dag_bag和相应的.pyc文件中删除了dag。当我尝试从airflow UI中删除相同的dag时,它显示此错误: Dag id MY_DAG_ID仍然在DagBag中。首先删除DAG文件。我使用的airflow版本是1.10.4,即使在重新启动airflow之后,我也无法从UI中删除。我之前使用的是1.10.3,但我从未遇到过这个问题。在从dags文件夹中删除后,我可以从UI中删除。
当我点击UI中的dag时,它显示: DAG "MY_DAG_ID“似乎丢失了。(这是我从文件夹中删除dag时所预期的)
我按照中的说明动态创建DAGs,通过变量k修改要创建的dags的数量
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
prin
我对Airflow完全陌生,我真的很难让一个非常简单的测试DAG运行: from airflow import DAG
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
default_args ={
"owner":"airflow",
"depends_on_past":False,
"retries":0,
我在windows 10 WSL上使用气流v2.0 (Ubuntu20.04)。警告信息是:
/home/jainri/.local/lib/python3.8/site-packages/airflow/models/dag.py:1342: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id create_tag_template_field_result is already in the DAG. Starting in Airfl
下面是airflow DAG代码。无论是在本地托管airflow时,还是在cloud composer上,它都能完美运行。但是,无法在Composer UI中单击DAG本身。我找到了一个类似的问题,并尝试了中链接的被接受的答案。我的问题与此类似。
import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.ope
我在运行Apache气流1.8.1。我希望在我的实例上运行32多个并发任务,但无法使任何配置工作。
我使用的是CeleryExecutor,UI中的气流配置为parallelism和dag_concurrency显示了64,我已经多次重新启动了气流调度程序、web服务器和工作人员(我实际上正在Vagrant中本地测试这个配置,但也在一个EC2实例上进行了测试)。
airflow.cfg
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances
我对气流很陌生。我能够跟踪一个视频并创建docker-compose yml文件、Dockerfile和一个dag文件。我可以查看我的守护进程并运行它。在我的脚本中,我试图打开一个文本文件(.txt),但是我得到了以下错误:FileNotFoundError: \[Errno 2\] No such file or directory。
我的文本文件在正确的位置。脚本运行在本地python环境上。我不知道当我在气流中运行时,它为什么会显示为一个错误。
我的docker-compose.yml、Dockerfile和dag文件如下所示。任何帮助我都会感激!谢谢!
docker-compose.y
关于“动态任务”的其他问题似乎解决了在计划或设计时动态构建DAG的问题。我对在执行过程中将任务动态添加到DAG很感兴趣。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('test_dag', description='a test',
我是Airflow的新手。我遵循一个教程,并编写了下面的代码。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from models.correctness_prediction import CorrectnessPrediction
default_args = {
'owner': 'abc',
'depends_on_past
有没有人可以帮我在Airflow中使用livybatchoperator,下面是我的代码…除此之外,除了spark operator之外,还有什么方法可以在气流中运行spark作业,在我的情况下,spark安装在不同的机器上。
我得到了这个错误:Getting Error in Airflow UI - "No module named 'airflow_livy'"。
from datetime import datetime, timedelta
from airflow_livy.batch import LivyBatchOperator
from a
我已经在运行Ubuntu和python 3.8的服务器上安装了Airflow。我正在尝试在Airflow UI中导入一个简单的dag来列出存储桶中的文件。
from airflow import DAG
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator
from airflow.operators.python import
我在我的虚拟机Apache Airflow上本地启动,我想连接到Amazon Glue作业来运行它们。我从pull-request得到的源代码:
那么,我应该建立哪些连接(在Airflow UI中)来运行Amazon Glue作业?你能给我一些文档建议吗?因为我在官方文件里找不到任何有用的东西。
对于dag,我使用简单的代码:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators
在airflow中显示有关损坏的DAG的信息
Broken DAG: [/data/airflow/dags/copy_from_Oracle_to_MySQL.py] No module named Oracle_to_MySQL_plugin
我尝试使用DAG copy_from_Oracle_to_MySQL.py从/data/airflow/dags中移动文件。
但是气流显示的信息
Broken DAG: [/data/airflow/dags/copy_from_Oracle_to_MySQL.py] No module named Oracle_to_MySQL_plugin
要从
我们最近从风流1.10.12升级到了气流2.1.0,并且在每个dags的UI中都会出现这样的问题错误。
我们已将版本从1.10.12升级到1.10.15,并取得了成功。然后,我们从1.10.15升级到2.1.0,并面临着这个问题。
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 249, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not
我正在以一种复杂的方式运行一系列相互依赖的任务。我想将这些依赖关系描述为DAG (有向无环图),并在需要时执行该图。 我一直在关注airflow,并写了一个虚拟脚本: from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
def cloud_runner():
# my typical usage here would be a http call to a service (e.g. gcp cloudrun)
我正在从UI手动触发任务,它将任务显示为成功,但数据库中什么也没有发生。基本上,我调用一个简单的过程(不带参数),将临时表中的值复制到主表中,并删除临时表中的内容。 from airflow import DAG
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import datetime
dag = DAG("sql_proc_0", "Testing running of SQL procedures",
schedule_interv
我有一个pyspark脚本,它现在工作得很好,我想做的是,我想为每一分钟安排该作业,为此,我使用了Apache Airflow,我为airflow创建了一个.py文件,如下所示: from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
from builtins import range
import airflow
from airflow.models import DAG
from
我是气流的新手,我试图导入我自己的定制jar作为DAG,它是用Talend 生成的,当我通过终端导入DAG时,没有显示错误,也没有将我的DAG添加到气流UI中的DAG列表中。
以下是我的.py文件代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from airflow.utils.email import send_email
import os
import sys
bib_app = "/home
为了进一步执行,我需要在另一个任务中使用来自oracleOperator的输出。我遇到的麻烦是,当我把数据拉到另一个任务中并打印出来时,它会给出一个零的结果。没有抛出错误,但没有传递数据。此外,任务UI中的xcom选项卡显示键和值的空白。
我的代码如下:
from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates i
我们希望在Dag中的Dag触发器期间从UI读取cli输入传递给dag。我尝试了下面的代码,但它不起作用。在这里,我将输入传递为{“kpi”:“ID123”},并希望在函数get_data_from_bq中打印此ip值。
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow import models
from airflow.models import Variabl