我有一个非常简单的测试DAG,如下所示:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='scheduler_test_dag',
start_date=datetime(2017, 9, 9, 4, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=
我有一个pyspark脚本,它现在工作得很好,我想做的是,我想为每一分钟安排该作业,为此,我使用了Apache Airflow,我为airflow创建了一个.py文件,如下所示: from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
from builtins import range
import airflow
from airflow.models import DAG
from
在airflow.cfg文件中,我将min_file_process_interval值配置为120秒。达格在每分钟后都有时间间隔运行。但是,它只是在每120秒之后(根据min_file_process_interval值)才被调度。这是意料之中吗?
我将min_file_process_interval更改为200秒,然后在200秒后开始执行dag计划。澄清一下,如果情况相反,即dag调度间隔为2分钟,min_file_process_interval为1分钟,则dag将按照其日程运行良好。下面是我的爸爸:
from airflow import DAG
from airflow.opera
当您遵循本教程时:
或者当您创建任何DAG并尝试使用此脚本使用API运行时:
for i in {1..10}; do
RUN_ID=BASH_REST_CALL__$(uuidgen)
curl -X POST \
http://localhost:8080/api/experimental/dags/my_test_dag/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' -d "{\
AIRFLOW_HOME=/path/to/my/airflow_home 我得到这样的警告... >airflow trigger_dag python_dag3
/Users/alexryan/miniconda3/envs/airflow/lib/python3.7/site-packages/airflow/configuration.py:627: DeprecationWarning: You have two airflow.cfg files: /Users/alexryan/airflow/airflow.cfg and /path/to/my/airflow_hom
我制作了一个非常简单的DAG,如下所示:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
cleanup_command = "/home/ubuntu/airflow/dags/scripts/log_cleanup/log_cleanup.sh "
dag = DAG(
'log_cleanup',
description='DAG for deleting old logs
我遇到了一个问题,我可以从airflow的rest api命令(https://airflow.apache.org/api.html)成功触发dag;但是,dag实例不能运行。我正在调用-> POST /api/experimental/dags/dag_id/dag_runs,其中dag_id是我正在运行的dag。唯一发生的事情是dag立即返回成功。我手动触发了dag,并开始运行dag实例(见图2 dag运行)。注意:第二次DAG运行失败-这应该不会影响我试图解决的问题。 DAG 修复了->必须处理调度程序的问题。我添加了'depends_on_past':F
我正在测试一个简单的dag在预定的时间间隔上运行,它是在每周五和周六的6UTC(‘0 6** 5,6')。但dag并没有在周五早上6点触发。我知道周五的实例将在周六运行,周六的实例将在周五运行。
我该怎么做才能让它只在周五运行周五的实例呢?或者其他工作吗?
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def create_txt():
f=open("/home/abc/
我已经查看了气流subDAG部分,并试图在网上找到任何可能有帮助的东西,但是我没有找到任何详细解释如何使subDAG工作的东西。运行subDAG的要求之一是应该启用它。如何启用/禁用subdag?
我编写了一些示例代码,这些代码在气流中不显示任何错误,但是当我尝试运行它时,subDAG中的任何一个操作符都不会被执行。
这是我的主要后台代码:
import os
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airfl
作为一个气流新手,我在看example_branch_operator
"""Example DAG demonstrating the usage of the BranchPythonOperator."""
import random
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.da
当我安排DAG在每天的特定时间运行时,DAG根本不会执行。但是,当我重新启动Airflow when服务器和调度程序时,DAG在该特定日期的计划时间执行一次,并且从第二天起不再执行。我使用的是带有python 2.7.6的Airflow版本v1.7.1.3。下面是DAG代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import time
n=time.strftime("%Y,%m
我在运行气流的教程。tutorial.py中的内容如下:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_ar
我对气流很陌生。我想我已经阅读了气流文档中关于调度的所有文章,但我似乎仍然不能让我的DAG运行在start_date+schedule_interval之后(即没有任务实例)。我用的是码头。我想知道我缺少了一个调度Dags的命令,尽管我在使用教程代码时并不是这样。
这是我的文件。
FROM ubuntu:latest
FROM python:3
RUN apt-get update -y
RUN apt-get install -y python-pip python-dev apt-utils build-essential
RUN pip install --upgrade pip
#
我已经在SO上看到了this和this的问题,并做了相应的更改。但是,我的从属DAG仍然停留在poking状态。下面是我的主DAG: from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
today = datetime.today()
default_args = {
'depends_on_
与前面的问题类似,但给出的答案都不起作用。我有一个DAG:
import datetime
import os
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.operators import BashOperator
PROJECT = os.environ['PROJECT']
GCS_BUCKET = os.environ['BUCKET']
API_KEY = os
我们最近尝试采用气流作为我们的“数据工作流”引擎,虽然我已经完成了大部分工作,但我仍然处在调度器如何计算何时触发DAG的灰色区域。
看看这个简单的dag:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
&
我在Red Hat Linux上运行airflow 1.10.3。我使用的是LocalExecutor,via服务器和调度程序都是通过systemd启动的。
调度程序生成的日志文件是完全可读的(即“-rw-”模式)。创建的日志目录为drwxrwxrwx。
这无法通过我的组织进行的安全扫描。我需要能够限制这些文件的权限。
/etc/profile中的umask为077。我还将UMask=0007添加到服务的systemd单元文件中。但是,尽管这似乎适用于dags/ logs /scheduler/目录中的日志,但它不会影响DAG运行日志。
[root@server logs]# ls -la s
我刚刚在Python3和Composer image版本composer-1.4.0-airflow-1.10.0上设置了一个Cloud Composer Environment。否则,所有设置都是“常用的”;即没有配置覆盖。
我正在尝试测试一个非常简单的DAG。它在我的本地Airflow服务器上运行没有问题,但在Cloud Composer上,web服务器的任务信息视图显示消息Dependencies Blocking Task From Getting Scheduled
依赖关系是Unknown,原因如下:
All dependencies are met but the task in
我正在研究Airflow文档,以便更好地了解它的调度机制。我在下面遇到了一个例子。 文件中提到,当调度人员在2016-01-02上午6点选择DAG时,将创建一次DAG运行,execution_date为2016-01-01,下一次运行将在2016-01-03凌晨创建,执行日期为2016-01-02。 调度间隔是按小时提供的,执行日期是指DAG在结束时运行的时间段的开始,那么为什么不是在2016-01-02早上6点调度器挑选DAG的前一个小时呢? """
Code that goes along with the Airflow tutorial located at:
当尝试通过airflow trigger_dag test_dag运行dag时,我得到了错误:airflow.exceptions.DagNotFound: Dag id test_task not found in DagModel。
运行airflow list_dags时,DAG正确列出。我还进行了检查,以确保将$AIRFLOW_HOME目录正确设置为dag所在的位置。我能让它工作的唯一方法就是运行一个特定的任务,比如airflow test test_dag test_task。运行python dags/test_dag.py时未显示任何错误。
导入后,dag文件本身中的代码:
de
我试图在测试环境中使用多个任务来测试一个守护进程。我能够测试与dag关联的单个任务,但我希望在dag中创建多个任务,并启动第一个任务。用于测试我正在使用的dag中的一个任务。
task1.run()
正在被处决。但是,当我在后台的下游有一个接一个的任务时,同样的情况就不起作用了。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner'
我在我的ubuntu机器上本地运行气流,我的airflow.cfg文件在目录:/home/airflow/airflow中,所以我为我的dags创建了一个子目录,即/home/airflow/airflow/dags/,并在那里创建了一个dag。
我为检查示例输出而创建的dag是:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': '
我最近开始使用Airflow和它的一个新概念Taskflow API。我有一个包含多个修饰任务的DAG,每个任务都有50+代码行。因此,我决定将每个任务转移到一个单独的文件中。
在引用堆栈溢出之后,我可以以某种方式将DAG中的任务移动到每个任务的单独文件中。现在,我的问题是:
所示的两个代码示例是否工作相同?(我担心任务的范围)。将如何共享数据b/w?在性能上有什么不同吗?(我看到Subdags由于性能问题而不受欢迎,尽管这并不是Subdags仅仅关心的问题)。
我在网上看到的所有代码示例(以及官方文档中的代码)都将所有任务放在一个文件中。
样本1
import logging
from ai
由于某些原因,气流似乎不会触发每周时间表间隔的达格的最新运行。
目前日期:
$ date
$ Tue Aug 9 17:09:55 UTC 2016
达格:
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
dag_id='superdag',
start_date=datetime(2016, 7, 1
我每天晚上7:42有一个dag,然而,当我手动触发dag时,它会运行,而不是自动运行。
我正在使用官方的yaml文件和运行容器,默认情况下已经将时区设置为IST,仍然不能自动触发dags
代码如下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.sftp.operators.sftp import SFTPOpera
我正在尝试创建动态DAG,然后将它们发送到调度程序。我尝试了的引用,它运行得很好。我对它做了一些修改,如下代码所示。在调试问题时需要帮助。
我试过了。试着运行文件。Dag被执行,globals()正在打印所有的DAGs对象。但不知何故没有在list_dags或UI中列出
from datetime import datetime, timedelta
import requests
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airfl
这是我在dags文件夹中的数据文件。
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime im