我使用的是Airflow 1.10.2,但是Airflow似乎忽略了我为DAG设置的超时。
我正在使用dagrun_timeout
参数设置DAG的超时时间(例如20秒),我有一个任务需要2分钟才能运行,但Airflow将DAG标记为成功!
args = {
'owner': 'me',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True,
}
dag = DAG(
'test_timeout',
schedule_interval=None,
default_args=args,
dagrun_timeout=timedelta(seconds=20),
)
def this_passes(**kwargs):
return
def this_passes_with_delay(**kwargs):
time.sleep(120)
return
would_succeed = PythonOperator(
task_id='would_succeed',
dag=dag,
python_callable=this_passes,
email=to,
)
would_succeed_with_delay = PythonOperator(
task_id='would_succeed_with_delay',
dag=dag,
python_callable=this_passes_with_delay,
email=to,
)
would_succeed >> would_succeed_with_delay
不会抛出错误消息。我是否使用了错误的参数?
发布于 2019-07-19 21:51:55
如source code中所述
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns, and only once the
# of active DagRuns == max_active_runs.
因此,这可能是您设置schedule_interval=None
时的预期行为。这里的想法是为了确保计划的DAG不会永远持续下去,并阻止后续的运行意图。
现在,您可能会对所有运算符中提供的execution_timeout
感兴趣。例如,您可以在PythonOperator
上设置60秒超时,如下所示:
would_succeed_with_delay = PythonOperator(task_id='would_succeed_with_delay',
dag=dag,
execution_timeout=timedelta(seconds=60),
python_callable=this_passes_with_delay,
email=to)
https://stackoverflow.com/questions/57110885
复制相似问题