我制作了一个非常简单的DAG,如下所示:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
cleanup_command = "/home/ubuntu/airflow/dags/scripts/log_cleanup/log_cleanup.sh "
dag = DAG(
'log_cleanup',
description='DAG for deleting old logs
我正在尝试运行一个气流管道,其中我有一个主要Postgres任务,然后是一个Python循环任务,然后是一个最终postgres任务,所以这个dag类似于如下所示
PythonOperator
|Task B.1|
Postgres |Task B. | Postgres
Task A ------|Task B. |-----> Task C
|Task B.n|
所以,我写了这篇文章:
import glob
import logging
import os
from datetim
我正在创建一个进程来执行某些数据库中预定义的任务。在任务执行之后,我将更新它们的执行时间,直到它们再次执行为止。每个任务的目的基本上都是进行sql单元测试。
到目前为止我尝试的是
创建父主守护程序
从数据库中获取任务列表
对于每一行(任务)-我正在创建包含执行过程的子进程
当所有子标记完成时-我正在更新任务的执行时间
目前,它在第一次运行后失败。显示为Broken DAG: [/usr/local/airflow/src/dags/d06-query_validations/d06-query_validations_daily.py] list index out o
假设我有以下DAG依赖项:
/> DAG B -->....
/
/
DAG A ---> DAG C -->...
\
\
\> DAG D -->...
这些DAG每天运行,DAG A的输出分别由DAG B、C、D消耗。
如果DAG A失败(例如,Spark作业有错误,我们希望在修复后重新执行它),如何确保我们还重新运行DAG B、C、D等,以便它们现在以修复的DAG A输出运行?它能自动完成吗?
我正在写一个气流DAG,有一些函数的问题。我尝试通过将数据打印到标准输出并使用logging库来进行调试。
我的DAG示例如下:
from datetime import timedelta
import airflow
import logging
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.hooks.datadog_hook import Dat
我需要触发工作流,以生成同一dag的多个工作流与不同的输入从file.Unfortunately它得到触发器仅输入302406 .Not的其他两个输入,即302405和302404。以下是代码
条件触发器
def conditionally_trigger(context, dag_run_obj):
testsite_array = []
with open('/etc/dev/airflow/dags/input.txt') as my_file:
for line in my_file:
testsite_array
我正在本地机器上的一个码头集装箱中运行气流。我正在运行一个测试DAG,执行3项任务。然而,这三个任务运行良好,但bash操作符的最后一个任务被卡在循环中,如下面的图片所示。在日志文件中,只为bash脚本的第一次执行生成一个条目,然后什么也不生成,但是python文件一直被执行。对于这个问题有什么建议吗?
谢谢,
理查德
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash impo
我有一个气流DAG,这是没有意义的回填。我想,使用气流1.8,你可以给DAG参数catchup=False,所以它只会开始最近的工作。也就是说,我想让DAG从午夜开始,每天运行。但。这就是现在的事情: DAG立即开始,而不是在午夜。另外,当我清除所有DAG运行时,它将立即重新启动。然后DAG将每天运行,但将在错误的时间安排它开始+1天。
我怎么能有一个DAG只开始运行最近的作业,和在特定的时间(午夜)开始?
下面是我使用的代码:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.opera