我试图理解如何在Apache airflow中创建动态dags,因为我需要它来在我的项目中创建动态dags。 下面是iam的链接:Dynamic DAG creation in Apache airflow 下面是用于创建示例hello world动态DAGs的代码块(基于输入参数的动态DAG创建)。 from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def create_dag(dag_id,
我按照中的说明动态创建DAGs,通过变量k修改要创建的dags的数量
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
prin
我想为conn_id生成spark_default。我正在k8s上运行我的k8s,我想使用火花主程序动态地生成conn_id,这是运行在同一个名称空间中的另一个容器。
是否有办法动态生成conn_id :类似于:
环境变量
或者使用SparkSubmitOperator本身编写和生成conn_id
下面是我的dag代码:
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import date
我想从Python脚本(实际上是木星笔记本)中调用一个气流DAG定义。
我确保设置了用于airflow命令的环境变量(例如,AIRFLOW_HOME)和python-dotenv,以便在木星笔记本中加载.env文件。此.env文件包含各种环境变量,包括AIRFLOW_HOME。
%load_ext dotenv
# the AIRFLOW_HOME env var must be an absolute file system path
# if the Notebook is relative to where the `airflow.db` Sqlite DB is.
%dotenv
我刚接触apache airflow,您能帮助我了解在远程计算机上运行DAG应该在哪里配置/配置什么吗?我使用celery_executor在工作节点上执行代码,我没有在工作节点上做任何配置,我使用RabitMQ作为队列服务,似乎我已经正确地配置了Airflow集群。
我的DAG文件:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
我似乎不了解如何将模块导入到apache airflow DAG定义文件中。例如,我想这样做是为了能够创建一个库,使具有类似设置的声明任务不那么繁琐。 这是我能想到的重复这个问题的最简单的例子:我修改了airflow教程(https://airflow.apache.org/tutorial.html#recap),只需导入一个模块并从该模块运行一个定义。如下所示: 目录结构: - dags/
-- __init__.py
-- lib.py
-- tutorial.py tutorial.py: """
Code that goes along with the A
我有一个pyspark脚本,它现在工作得很好,我想做的是,我想为每一分钟安排该作业,为此,我使用了Apache Airflow,我为airflow创建了一个.py文件,如下所示: from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
from builtins import range
import airflow
from airflow.models import DAG
from
我在运行Apache气流1.8.1。我希望在我的实例上运行32多个并发任务,但无法使任何配置工作。
我使用的是CeleryExecutor,UI中的气流配置为parallelism和dag_concurrency显示了64,我已经多次重新启动了气流调度程序、web服务器和工作人员(我实际上正在Vagrant中本地测试这个配置,但也在一个EC2实例上进行了测试)。
airflow.cfg
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances
我正在使用Airflow Version 2.2.5/Composer 2.0.15实现任务超时错误。在Airflow version2.2.3 /Composer Version 1.18.0中,相同的代码运行得非常好。
错误信息:
Broken DAG: [/home/airflow/gcs/dags/test_dag.py] Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/enum.py", line 256, in __new__
if canonical_membe
我正在测试一个简单的dag在预定的时间间隔上运行,它是在每周五和周六的6UTC(‘0 6** 5,6')。但dag并没有在周五早上6点触发。我知道周五的实例将在周六运行,周六的实例将在周五运行。
我该怎么做才能让它只在周五运行周五的实例呢?或者其他工作吗?
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def create_txt():
f=open("/home/abc/
关于“动态任务”的其他问题似乎解决了在计划或设计时动态构建DAG的问题。我对在执行过程中将任务动态添加到DAG很感兴趣。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('test_dag', description='a test',
在visual代码中运行气流DAG代码时出错。
误差
ImportError: cannot import name 'DummyOperator' from 'airflow.operators' (c:\Users\10679196\AppData\Local\Programs\Python\Python38\lib\site-packages\airflow\operators\__init__.py)
导入报表
from airflow import DAG
from airflow.operators import DummyOperator
版本
a
我动态地创建了一个dags集合(对所有对象使用相同的.py )。有一个build-DAG是我不能运行的:
airflow.exceptions.AirflowException: dag_id could not be found: `build-DAG`. Either the dag did not exist or it failed to parse.
at get_dag (/usr/local/lib/python2.7/site- packages/airflow/bin/cli.py:130)
at run (/usr/local/lib/python2.7/site-p
我正在使用apache airflow和BashOperator。每个BashOperator执行一个python脚本。例如:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
with DAG(dag_id='dag_example', default_args=None,
schedule_interval='0 2 */2 * *',
catchup=False) as dag:
我从airflow dag_bag和相应的.pyc文件中删除了dag。当我尝试从airflow UI中删除相同的dag时,它显示此错误: Dag id MY_DAG_ID仍然在DagBag中。首先删除DAG文件。我使用的airflow版本是1.10.4,即使在重新启动airflow之后,我也无法从UI中删除。我之前使用的是1.10.3,但我从未遇到过这个问题。在从dags文件夹中删除后,我可以从UI中删除。
当我点击UI中的dag时,它显示: DAG "MY_DAG_ID“似乎丢失了。(这是我从文件夹中删除dag时所预期的)
我有一个继承baseoperator的自定义运算符。我正试着用“排队”的名字来替不同的芹菜工人接上任务。
但是它使用原始模板字符串(un呈现jinja字符串)作为队列名,而不是呈现字符串。
如果我将预期的队列名直接作为一个简单的字符串给出,那么同样的流也能工作。
from airflow import DAG
from operators.check_operator import CheckQueueOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import B
我使用root帐户在集群上安装了Airflow。我知道这是不好的练习,但它只是测试环境。我创建了一个简单的DAG:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
dag = DAG('create_directory', description='simple create directory workflow', start_date=datetime(2017,