当尝试使用命令airflow scheduler启动Airflow的调度器时,如果executor = LocalExecutor在airflow.cfg中,我会得到以下错误 File "/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py", line 92, in run
key, command = self.task_queue.get()
File "<string>", line 2, in get
File "/
我希望添加另一个DAG到现有的Airflow服务器。服务器当前正在使用LocalExecutor,但我可能希望我的DAG使用CeleryExecutor。配置文件airflow.cfg似乎只允许一个执行器:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor
是否可以配置Airflow,以便现有的DAG可以继续使用LocalExecutor,而我的新DAG可以使用Cele
我们正在升级到气流2.0,我有以下任务:
with dag:
cms_ingest = SubDagOperator(
subdag=cms_s3ingest(
DAG_NAME, 'cms_s3ingest', default_args['start_date'], dag.schedule_interval),
task_id='cms_s3ingest',
# so that subtasks can run in parallel
execut
我已经改变了下面的信任连接气流到mysql数据库。但气流雷达连接到默认的sqlite数据库。请参阅下面的配置,我试图连接到mysql。
1)config of airflow.cfg:
executor = LocalExecutor
sql_alchemy_conn = mysql+pymysql://root:12345678@localhost:3306/airflow
2) pip install PyMySQL (installed pyMysql package)
3)Installed mysql server in ubuntu machine where airflow
我需要并行运行dags,但不需要大量的扩展,这样LocalExecutor就可以很好地完成这项工作。我查看了气流文档,并首先创建了一个MySQL数据库:
CREATE DATABASE airflow_db CHARACTER SET utf8;
CREATE USER <user> IDENTIFIED BY <pass>;
GRANT ALL PRIVILEGES ON airflow_db.* TO <user>;
然后在airflow.cfg文件中修改以下参数:
executor = LocalExecutor
sql_alchemy_conn =
我有一个非常简单的dag,它应该在每周一19:10运行。dag如下: from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
args = {'owner': 'AirFlow'}
with DAG(dag_id='schedule_test_weekly', default_args=args, schedule_interval=
我是Airflow的新手,当我改变代码时,我不知道如何重新加载操作符/插件。我正在使用LocalExecutor和一个外部数据库(MySql)。我已经尝试重新启动get服务器和调度程序,但在导入dags时仍然出现相同的错误:
File "/home/ec2-user/airflow/dags/extractor.py", line 2, in <module>
from airflow.contrib.operators.emr_spark_plugin import EmrSparkOperator
ImportError: No module n
我跟着达格在气流中奔跑,
当执行以上dag时,它将串行运行以下顺序之一。
A -> B -> C1 -> C2 -> D1 -> D2
A -> B -> C2 -> C1 -> D2 -> D1
但我的要求是同时并行运行C1和C2任务。我airflow.cfg的一部分
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = Cele
我试图使用以下LocalExecutor.yml文件运行puckel气流对接器容器:
在失败或重试时,我无法获得气流给我发送电子邮件。
我试过以下几种方法:
用smtp主机名编辑配置文件
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp@mycompa
我尝试配置Airbnb AirFlow来使用CeleryExecutor,如下所示:
我将airflow.cfg中的executer从SequentialExecutor更改为CeleryExecutor
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
但我得到以下错误:
airflow.configuration.AirflowConfigException: e
我正尝试在本地计算机上从LocalExecutor切换到CeleryExecutor。我应该使用pip install airflow[celery]安装必要的库。我还安装并运行了redis。
但是,当我尝试运行airflow worker时,我得到以下错误:
DEFAULT_EXECUTOR = CeleryExecutor()
NameError: CeleryExecutor' is not defined
我将broker url和celery_result_backend都设置为redis://localhost:6379,并在后台运行redis。我做错了什么?
我使用的是带有LocalExecutor的Airflow 1.7.0,文档建议我们需要传递证书和密钥路径,并将端口更改为443,如下所示
[webserver]
web_server_ssl_cert = <path to cert>
web_server_ssl_key = <path to key>
# Optionally, set the server to listen on the standard SSL port.
web_server_port = 443
base_url = http://<hostname or IP>:443
我已
我正在运行气流,并试图迭代我们正在从命令行构建的一些任务。
当运行气流网络服务器时,一切都如预期的那样工作。但是当我运行airflow backfill dag task '2017-08-12'时,气流会返回:
[2017-08-15 02:52:55,639] {__init__.py:57} INFO - Using executor LocalExecutor
[2017-08-15 02:52:56,144] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow/dags
2017-0
我正在使用气流稳定舵图和Kubernetes Executor,新的pod正在为dag调度,但它与dag_id的故障无法找到问题。我正在使用git-sync来获取dags。下面是错误和kubernetes配置值。有人能帮我解决这个问题吗?
错误:
[2020-07-01 23:18:36,939] {__init__.py:51} INFO - Using executor LocalExecutor
[2020-07-01 23:18:36,940] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/dag
我在Red Hat Linux上运行airflow 1.10.3。我使用的是LocalExecutor,via服务器和调度程序都是通过systemd启动的。
调度程序生成的日志文件是完全可读的(即“-rw-”模式)。创建的日志目录为drwxrwxrwx。
这无法通过我的组织进行的安全扫描。我需要能够限制这些文件的权限。
/etc/profile中的umask为077。我还将UMask=0007添加到服务的systemd单元文件中。但是,尽管这似乎适用于dags/ logs /scheduler/目录中的日志,但它不会影响DAG运行日志。
[root@server logs]# ls -la s
我有多个dag使用芹菜执行程序,但我想使用Kubernetes Executor运行一个特定的dag。我无法推断出一个好的和可靠的方法来实现这一点。
我有一个声明要使用CeleryExecutor的airflow.cfg。我不想改变它,因为除了一个dags之外,所有的dags都需要它。
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
我的dag代码:
from da
我面临一些问题,试图建立一个基本的DAG文件在气流(但我也有另外两个文件)。
我通过Ubuntu使用LocalExecutor,并将我的文件保存在“C:\Users\tdamasce\Documents\工作区”中,其中包含了dag和日志文件。
我的剧本是
# step 1 - libraries
from email.policy import default
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyO
下面是配置im使用
[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /root/airflow
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = /root/airflow/dags
# The folder where airflow should store its log files. This location
bas
我对气流很陌生。我想我已经阅读了气流文档中关于调度的所有文章,但我似乎仍然不能让我的DAG运行在start_date+schedule_interval之后(即没有任务实例)。我用的是码头。我想知道我缺少了一个调度Dags的命令,尽管我在使用教程代码时并不是这样。
这是我的文件。
FROM ubuntu:latest
FROM python:3
RUN apt-get update -y
RUN apt-get install -y python-pip python-dev apt-utils build-essential
RUN pip install --upgrade pip
#