我有一个由三个任务组成的Airflow工作流;第二个任务依赖于第一个任务,第三个任务依赖于第二个任务。
如果我通过then服务器运行DAG,则第一个任务完成,但随后开始重新运行,而不是触发第二个任务。要记住的一件事是,第一个任务确实需要130秒以上的时间来运行。这是因为第一个任务的持续时间造成的吗?
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
default_args = {
'owner': 'David',
'depends_on_past': True,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'DCM_Floodlight_Report_API',
default_args=default_args,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
t1 = BashOperator(
task_id='Pull_DCM_Report',
bash_command='python "/Users/run_report.py" 2737542 134267867', dag=dag)
t2 = BashOperator(
task_id='Cleanse_File',
bash_command='python "/Users/cleanse_file.py"',dag=dag)
t3 = BashOperator(
task_id='S3_Bucket_Creation_Upload_File',
bash_command='python "/Users/aws_s3_creation&load.py"',dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
发布于 2018-06-05 02:10:44
我不认为你的任务运行时是个问题。-此行为很可能是由于catchup
参数造成的,该参数默认为True
。
https://airflow.apache.org/scheduler.html#backfill-and-catchup
这意味着Airflow正在为start_date
和当前时间之间的每个调度间隔调度第一个任务。您可以在UI中查看树视图,以查看是否正在调度多个DagRun。如果您只是测试您的DAG,我建议您在测试时将schedule_interval设置为@once
,然后再安排它在过去或将来运行。
发布于 2018-06-05 03:32:13
在没有重试逻辑的情况下尝试它,看看它是如何执行的。使用以下默认参数和dag信息:
`default_args = {
'owner': 'David',
'depends_on_past': False,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(
dag_id='DCM_Floodlight_Report_API',
default_args=default_args,
catchup=False,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
我添加了catchup
并将其设置为False,并将depends_on_past
更改为False。我也删除了重试逻辑。这可能会解决您的问题-请让我知道!
https://stackoverflow.com/questions/50686063
复制相似问题