当我安排DAG在每天的特定时间运行时,DAG根本不会执行。但是,当我重新启动Airflow when服务器和调度程序时,DAG在该特定日期的计划时间执行一次,并且从第二天起不再执行。我使用的是带有python 2.7.6的Airflow版本v1.7.1.3。下面是DAG代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import time
n=time.strftime("%Y,%m
关于this earlier question,假设我们有一个Apache Airflow DAG,它包含两个任务,首先是一个HTTP请求(即SimpleHTTPOperator),然后是一个处理第一个任务的响应的PythonOperator。 为了方便起见,以Dog CEO API为例,考虑以下DAG: from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airfl
我刚接触Airflow,只是为了练习在DAG中做一个简单的数据转换。然而,我不确定为什么第一个任务总是意外失败。有人能给我一些关于如何在DAG中调试失败的任务或指出错误部分的提示吗?非常感谢。
import pandas as pd
import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import da
我有一个DAG,里面有三个任务。我不想在基于条件的DAG运行中显示第二个任务(middle_name)。例如,如果middle_name_var == 'false',我不想在DAG中显示middle_name任务。有什么办法能做到这一点吗?
from airflow.operators import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
按照教程,我创建了一个文件夹$AIRFLOW_HOME/ DAG,并将教程DAG文件放在那里。然后启动气流调度器。默认情况下,它是暂停的。但是如果我看一下气流调度器的输出,我看到了很多运行,试图创建DAG。它为什么一直跑?
[2018-09-10 15:49:24,123] {jobs.py:1108} INFO - No tasks to consider for execution.
[2018-09-10 15:49:24,125] {jobs.py:1538} INFO -
========================================================
我试图在测试环境中使用多个任务来测试一个守护进程。我能够测试与dag关联的单个任务,但我希望在dag中创建多个任务,并启动第一个任务。用于测试我正在使用的dag中的一个任务。
task1.run()
正在被处决。但是,当我在后台的下游有一个接一个的任务时,同样的情况就不起作用了。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner'
我得到了以下DAG
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
def select_next_branch():
if some_condition:
我正在尝试使用ExternalTaskSensor,它被困于戳另一个DAG的任务,这个任务已经成功地完成了。
在这里,第一个DAG "a“完成它的任务,然后通过ExternalTaskSensor触发第二个DAG "b”。相反,它被困在a.first_task上。
第一次DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_ar
我正在尝试创建一个气流管道,用于从API下载数据,处理数据,将其保存为CSV,然后将数据加载到Postgres数据库(所有数据都在码头容器中)。代码看起来像这样
from datetime import datetime, timedelta
import pandas as pd
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
defaul
我写这段代码:
from airflow import DAG
from airflow.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_
我已经动态创建了一个subdag。一切正常,main_dag运行得很好。它的PythonOperator函数正在被调用。但在Subdag中可调用的Python不会被调用。请帮帮我。由于我对Airflow还不熟悉,所以我从不同的来源获取并合并了这段代码。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_o
我想得到这个DAG的默认args中提到的电子邮件,使用气流中的另一个DAG。我怎么能这么做?请帮帮忙,我是新来的气流!
from airflow.models import DagRun
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from datetime import datetime, timedelta
from airflow import DAG
def first_function(**context):
pri
我是新来的。我创建了下面的第一个守护程序,从google大查询表中选择数据&将其保存到pd数据中。需要以下建议
我应该在哪里提供大查询的连接id?
由于pd.read_gbq要求自动化,如何在气流场中处理这些问题?
导入操作系统
import pandas as pd
from airflow.contrib.operators import bigquery_operatorfrom #this will
from datetime import datetime
from airflow import DAG
from airflow.operators.py
在DAG中是否有自定义电子邮件和发送任何任务失败的选项。有一个类似‘email _on_failure’的选项: True,但它不提供动态添加内容到电子邮件主题或正文的选项。
我的DAG将如下所示
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.ope
我发现xcom实际上将数据写入数据库并从其他任务中提取数据。我的数据集很大,对它进行分类和写入数据库会造成一些不必要的延迟。是否有一种不使用xcom在同一气流Dag中的任务之间通信数据的方法?
下面是我尝试过的代码,上下文实际上没有被传递。我知道使用task_instance.xcom_push()可以工作,但它也会对数据进行筛选,并将其写入我不需要的数据库中。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, t
我已经实现了运行单个dag的测试用例,但它似乎在1.9中不起作用,可能是因为在airflow 1.8中引入了更严格的池。我试着运行下面的测试用例:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
class DAGTest(unittest.TestCase):
def make_tasks(self):
dag = DAG(
我希望一个进程在完成另一个进程之后启动。一个解决方案是使用外部传感器功能,下面您可以找到我的解决方案。我遇到的问题是依赖的守护进程陷入了戳,我检查了这个并确保这两个dag按照相同的时间表运行,我的简化代码如下所示:任何帮助都将不胜感激。领袖达格:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow