我试图安排我的DAG每分钟运行一次,但它似乎是每秒钟运行一次。根据我所读到的所有内容,我应该只需要在我的DAG中包含schedule_interval='*/1 * * * *', #..every 1 minute,就是这样,但它不起作用。下面是我设置的一个简单示例来测试它:
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 6, 4),
'schedule_interval': '*/1 * * * *', #..every 1 minute
'email': ['airflow@airflow.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id='airflow_slack_example',
start_date=datetime(2018, 6, 4),
max_active_runs=3,
schedule_interval='*/1 * * * *', #..every 1 minute
default_args=default_args,
)
test= BashOperator(
task_id='test',
bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
retries=1,
dag=dag)更新:
在与@Taylor Edmiston讨论了他的解决方案后,我们意识到我需要添加catchup=False的原因是因为我使用Pip安装了Airflow,它使用的是过时的Airflow版本。显然,如果你使用的是来自master branch of it's repository的气流,那么你不需要包含catchup=False来让它像我所尝试的那样每分钟都在运行。因此,尽管公认的答案解决了我的问题,但它并没有解决@Taylor Edmiston发现的潜在问题。
https://stackoverflow.com/questions/50689072
复制相似问题