我有一个pyspark脚本,它现在工作得很好,我想做的是,我想为每一分钟安排该作业,为此,我使用了Apache Airflow,我为airflow创建了一个.py文件,如下所示: from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
from builtins import range
import airflow
from airflow.models import DAG
from
我想从Python脚本(实际上是木星笔记本)中调用一个气流DAG定义。
我确保设置了用于airflow命令的环境变量(例如,AIRFLOW_HOME)和python-dotenv,以便在木星笔记本中加载.env文件。此.env文件包含各种环境变量,包括AIRFLOW_HOME。
%load_ext dotenv
# the AIRFLOW_HOME env var must be an absolute file system path
# if the Notebook is relative to where the `airflow.db` Sqlite DB is.
%dotenv
下面是我的简单DAG/ Python脚本,它位于Google云桶上的DAGS文件夹中。
from airflow import DAG
import airflow
from airflow.operators import BashOperator
from datetime import datetime,timedelta , date
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from ge
有一个像这样的test_dag_father:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.ut
我们刚刚开始为日程安排包括气流。我的一个脚本每天运行。它使用模板参数({ ds_nodash })获取日期。但是我不得不重新运行一天的负载(过去的日期),我如何提供输入参数。输入参数将覆盖ds_nodash。
I have :
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} "
Would like to run for
trade_acx_ld_cmd = "/hom
我正在尝试使用ExternalTaskSensor,它被困于戳另一个DAG的任务,这个任务已经成功地完成了。
在这里,第一个DAG "a“完成它的任务,然后通过ExternalTaskSensor触发第二个DAG "b”。相反,它被困在a.first_task上。
第一次DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_ar
在DAG中是否有自定义电子邮件和发送任何任务失败的选项。有一个类似‘email _on_failure’的选项: True,但它不提供动态添加内容到电子邮件主题或正文的选项。
我的DAG将如下所示
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.ope
我目前正在开发一个DAG,它将通过电子邮件发送用户列表,无论DAG是否已成功完成。我试图让DAG的流程看起来像下面的示例:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
def print_hello():
return 'Hello world!'
default_a
在的帮助下,我刚刚编写了一个程序(如文章中所示),当一个文件放置在S3桶中时,会触发运行中的一个DAG中的任务,然后使用BashOperator执行一些工作。一旦完成,DAG就不再处于运行状态,而是进入成功状态,如果我想让它获取另一个文件,我需要清除所有的“过去”、“未来”、“上游”、“下游”活动。我想要使这个程序,使它始终运行和任何时候,一个新的文件被放置在S3桶中,程序开始任务。
我是否可以继续使用S3KeySenor来完成这个任务,或者是否需要找到一种方法来设置一个来运行我的DAG?到目前为止,如果我的S3KeySensor只运行一次的话,它是毫无意义的。
from airflow im
我已经查看了气流subDAG部分,并试图在网上找到任何可能有帮助的东西,但是我没有找到任何详细解释如何使subDAG工作的东西。运行subDAG的要求之一是应该启用它。如何启用/禁用subdag?
我编写了一些示例代码,这些代码在气流中不显示任何错误,但是当我尝试运行它时,subDAG中的任何一个操作符都不会被执行。
这是我的主要后台代码:
import os
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airfl
我一直在尝试在气流中使用“example_subdag_operator”。在我将“start_date”更改为“datetime.now()”之后,然后手动触发一个dag运行。操作符是绿色的,但是当我放大到子程序本身时,没有执行任何操作(在图形视图中是白色的),“Run”下拉列表是空的。\
我用师父制作的最新版本。我还测试了pip发布的版本。仍然是同样的问题\
这是个虫子吗?还是我漏掉了什么?
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator
我得到了以下DAG
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
def select_next_branch():
if some_condition:
我们的代码库中有许多相同的回调函数粘贴了很多次,所以我希望将它们移到一个单独的插件中,然后从那里调用函数。然而,我有问题的代码,甚至能够到达插件。我在try/except块中碰到了错误,这意味着我开始在DAG中做了一些错误,但是我没有多少运气。
DAG 代码
from airflow import DAG
from airflow.operators.slack_airflow_plugins import SlackSuccessAlert
from airflow.operators.python_operator import PythonOperator
from airflow.op
我正在以一种复杂的方式运行一系列相互依赖的任务。我想将这些依赖关系描述为DAG (有向无环图),并在需要时执行该图。 我一直在关注airflow,并写了一个虚拟脚本: from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
def cloud_runner():
# my typical usage here would be a http call to a service (e.g. gcp cloudrun)
当我安排DAG在每天的特定时间运行时,DAG根本不会执行。但是,当我重新启动Airflow when服务器和调度程序时,DAG在该特定日期的计划时间执行一次,并且从第二天起不再执行。我使用的是带有python 2.7.6的Airflow版本v1.7.1.3。下面是DAG代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import time
n=time.strftime("%Y,%m
当尝试通过airflow trigger_dag test_dag运行dag时,我得到了错误:airflow.exceptions.DagNotFound: Dag id test_task not found in DagModel。
运行airflow list_dags时,DAG正确列出。我还进行了检查,以确保将$AIRFLOW_HOME目录正确设置为dag所在的位置。我能让它工作的唯一方法就是运行一个特定的任务,比如airflow test test_dag test_task。运行python dags/test_dag.py时未显示任何错误。
导入后,dag文件本身中的代码:
de
我用的是气流1.8.1。我有一个DAG,我相信我计划每5分钟运行一次,但它没有这样做:
忽略两个成功的DAG运行,那些是手动触发的。
我查看了DAG的调度程序日志,我看到:
[2019-04-26 22:03:35,601] {jobs.py:343} DagFileProcessor839 INFO - Started process (PID=5653) to work on /usr/local/airflow/dags/retrieve_airflow_artifacts.py
[2019-04-26 22:03:35,606] {jobs.py:1525} DagFilePro
我在运行Apache气流1.8.1。我希望在我的实例上运行32多个并发任务,但无法使任何配置工作。
我使用的是CeleryExecutor,UI中的气流配置为parallelism和dag_concurrency显示了64,我已经多次重新启动了气流调度程序、web服务器和工作人员(我实际上正在Vagrant中本地测试这个配置,但也在一个EC2实例上进行了测试)。
airflow.cfg
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances
我试图触发外部气流DAG,并将一些参数传递给DAG。DAG计划每3分钟运行一次。我的问题是,参数只用于第一次DAG运行。
from pyexpat import model
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
import os
dag_id = "proj"
home_path = os.path.expanduser("~")
runpath = os.pat
我对气流很陌生,我需要在用例中读取传入的json信任信息,然后根据所读取的信任值构造一个字符串,这个字符串将用作我们正在GCP数据过程中创建的集群的名称。
例:输入Json到dag
{ "x“:"data","y":"engg","z”:"usecase“}
我希望集群名是“engg”,并包含下面的代码,但是我得到了一个错误,就像它无法识别"dag_run“一样。任何帮助都将不胜感激。
from datetime import datetime
from airflow import DAG
from airflo
我有一个非常简单的测试DAG,如下所示:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='scheduler_test_dag',
start_date=datetime(2017, 9, 9, 4, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=
按照教程,我创建了一个文件夹$AIRFLOW_HOME/ DAG,并将教程DAG文件放在那里。然后启动气流调度器。默认情况下,它是暂停的。但是如果我看一下气流调度器的输出,我看到了很多运行,试图创建DAG。它为什么一直跑?
[2018-09-10 15:49:24,123] {jobs.py:1108} INFO - No tasks to consider for execution.
[2018-09-10 15:49:24,125] {jobs.py:1538} INFO -
========================================================
我有一个在成功和失败事件中执行动作和发送通知电子邮件的airflow作业,下面是我使用的代码。
#from builtins import range
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.operators.email_operator import EmailOperator
from
试图设置一个简单的电报通知,dag立即在没有日志的情况下失败,如果我使用除air字节以外的任何操作员,一切都正常,如果我删除电报操作员仍然工作良好,下面的代码,有人知道原因可能是什么吗?
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.telegram.operators.telegram import
我们最近尝试采用气流作为我们的“数据工作流”引擎,虽然我已经完成了大部分工作,但我仍然处在调度器如何计算何时触发DAG的灰色区域。
看看这个简单的dag:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
&