首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >dag.py会引发:"airflow.exceptions.AirflowException:任务缺少start_date参数“,但它是在代码中给出的

dag.py会引发:"airflow.exceptions.AirflowException:任务缺少start_date参数“,但它是在代码中给出的
EN

Stack Overflow用户
提问于 2020-05-12 18:34:08
回答 2查看 12K关注 0票数 8

我今天尝试创建我的第一个airflow DAG:

代码语言:javascript
运行
复制
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'default_user',
    'start_date': days_ago(2),
    'depends_on_past': True,
    # With this set to true, the pipeline won't run if the previous day failed
    'email': ['demo@email.de'],
    'email_on_failure': True,
    # upon failure this pipeline will send an email to your email set above
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
}

dag = DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

dummy_task = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

我的Airflow在Python3.6.8上运行正常,但当我试图将dagbag导入到airflow中时,它抛出了这个异常,我真的不知道为什么:

代码语言:javascript
运行
复制
[2020-05-11 17:11:15,601] {scheduler_job.py:1576} WARNING - No viable dags retrieved from /root/airflow/dags/first_dag.py
[2020-05-11 17:11:15,616] {scheduler_job.py:162} INFO - Processing /root/airflow/dags/first_dag.py took 0.031 seconds
[2020-05-11 17:12:05,647] {scheduler_job.py:154} INFO - Started process (PID=26569) to work on /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,653] {scheduler_job.py:1562} INFO - Processing file /root/airflow/dags/first_dag.py for tasks to queue
[2020-05-11 17:12:05,654] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,654] {dagbag.py:396} INFO - Filling up the DagBag from /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,666] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,662] {dagbag.py:239} ERROR - Failed to import: /root/airflow/dags/first_dag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 236, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/lib64/python3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 684, in _load
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/root/airflow/dags/first_dag.py", line 34, in <module>
    dag=dag,
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 70, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 422, in __init__
    self.dag = dag
  File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 548, in dag
    dag.add_task(self)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1301, in add_task
    raise AirflowException("Task is missing the start_date parameter")
airflow.exceptions.AirflowException: Task is missing the start_date parameter

我想我也应该给我的操作员一个start_date,但他们也应该使用他们的DAG中的日期。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-05-12 18:38:03

这是因为您的两个任务尚未分配给包含default_args中的start_date的DAG。

代码语言:javascript
运行
复制
dummy_task = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

python_task = PythonOperator(task_id='python_task', python_callable=my_func, dag=dag)

注您可以使用DAG object作为https://airflow.apache.org/docs/stable/concepts.html#context-manager中提到的上下文管理器,以避免对所有任务重复使用dag=dag

示例:

代码语言:javascript
运行
复制
with DAG(
    'basic_dag_2',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
) as dag:

    bashtask = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    dummy_task = DummyOperator(task_id='dummy_task', retries=3)

    python_task = PythonOperator(task_id='python_task', python_callable=my_func)

    dummy_task.set_downstream(bashtask)
    python_task.set_downstream(bashtask)
票数 28
EN

Stack Overflow用户

发布于 2021-08-21 11:21:08

如果遇到同样的问题,您只需将dag=dag放入您使用的每个运算符中。因为操作员仍然需要更多的参数才能作为任务运行,而这些参数是在任务可以运行之前在DAG部分中定义的。

举个例子:-this错了:

代码语言:javascript
运行
复制
postgres_task_1 = PostgresOperator(
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )

-this是对的:

代码语言:javascript
运行
复制
postgres_task_1 = PostgresOperator(
        dag=dag,
        task_id="get_param_2",
        postgres_conn_id="aramis_postgres_connection",
        sql="""
            SELECT  param_num_2 FROM public.aramis_meta_task
            """,
    )
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61749480

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档