任务流管理工具-Airflow配置和使用

Airflow能做什么

Airflow是一个工作流分配管理系统,通过有向非循环图的方式管理任务流程,设置任务依赖关系和时间调度。

Airflow独立于我们要运行的任务,只需要把任务的名字和运行方式提供给Airflow作为一个task就可以。

安装和使用最简单安装

在Linux终端运行如下命令 (需要已安装好和):

安装成功之后,执行下面三步,就可以使用了。默认是使用的, 只能顺次执行任务。

初始化数据库 [必须的步骤]

启动web服务器 [方便可视化管理dag]

启动任务 [scheduler启动后,DAG目录下的dags就会根据设定的时间定时启动]

此外我们还可以直接测试单个DAG,如测试文章末尾的DAG

最新版本的Airflow可从https://github.com/apache/incubator-airflow下载获得,解压缩按照安装python包的方式安装。

配置 以启用和

安装mysql数据库支持

设置mysql根用户的密码

新建用户和数据库

修改airflow配置文件支持mysql

文件通常在目录下

更改数据库链接

初始化数据库

初始化数据库成功后,可进入mysql查看新生成的数据表。

centos7中使用mariadb取代了mysql, 但所有命令的执行相同

配置LocalExecutor

注:作为测试使用,此步可以跳过, 最后的生产环境用的是CeleryExecutor; 若CeleryExecutor配置不方便,也可使用LocalExecutor。

前面数据库已经配置好了,所以如果想使用LocalExecutor就只需要修改airflow配置文件就可以了。 文件通常在目录下,打开更改为 即完成了配置。

把文后TASK部分的dag文件拷贝几个到目录下,顺次执行下面的命令,然后打开网址http://127.0.0.1:8080就可以实时侦测任务动态了:

配置CeleryExecutor (rabbitmq支持)

安装airflow的celery和rabbitmq组件

安装erlang和rabbitmq

如果能直接使用或安装则万事大吉。

我使用的CentOS6则不能,需要如下一番折腾,

配置rabbitmq

启动rabbitmq:

开机启动rabbitmq:

配置rabbitmq (REF)

修改airflow配置文件支持Celery

文件通常在目录下

更改executor为

更改broker_url

更改celery_result_backend,

配置CeleryExecutor (redis支持)

安装airflow的celery和celery的redis组件

安装redis

启动redis

使用检测后台进程是否存在

检测6379端口是否在监听

开机启动redis:

修改airflow配置文件支持Celery-redis

文件通常在目录下

更改executor为

更改broker_url

更改celery_result_backend,

测试

测试过程中注意观察运行上面3个命令的3个窗口输出的日志

当遇到不符合常理的情况时考虑清空 的数据库, 可使用清空。

删除dag文件后,webserver中可能还会存在相应信息,这时需要重启webserver并刷新网页。

关闭webserver:

启动服务器:

启动celery worker (不能用根用户):

启动scheduler:

提示:

一个脚本控制airflow系统的启动和重启

airflow.cfg 其它配置

dags_folder

目录支持子目录和软连接,因此不同的dag可以分门别类的存储起来。

设置邮件发送服务

多用户登录设置 (似乎只有CeleryExecutor支持)

修改中的下面3行配置

增加一个用户(在airflow所在服务器的python下运行)

TASK

参数解释

设置

设置较长的,方便在收到邮件后,能有时间做出处理

然后再修改为较短的,方便快速启动

Airflow assumes idempotent tasks that operate on immutable datachunks. It also assumes that all task instance (each task for eachschedule) needs to run.

If your tasks need to be executed sequentially, you need totell Airflow: use the flag on the tasksthat require sequential execution.)

如果在TASK本该运行却没有运行时,或者设置的为时,推荐使用。我在运行dag时,有时会出现,明明上游任务已经运行结束,下游任务却没有启动,整个dag就卡住了。这时设置可以解决这类问题。

in format like

Task中调用的命令出错后需要在网站中点击手动重启。为了方便任务修改后的顺利运行,有个折衷的方法是:

写完task DAG后,一定记得先检测下有无语法错误

测试文件1:ct1.py

测试文件2:

run1.sh

run2.sh

其它问题

The DagRun object has room for a parameter that gets exposedin the “context” (templates, operators, …). That is the placewhere you would associate parameters to a specific run. For now thisis only possible in the context of an externally triggered DAG run.The way the works, you can fill in the confparam during the execution of the callable that you pass to theoperator.

If you are looking to change the shape of your DAG through parameters,we recommend doing that using “singleton” DAGs (using a “@once”), meaning that you would write aPython program that generates multiple dag_ids, one of each run,probably based on metadata stored in a config file or elsewhere.

The idea is that if you use parameters to alter the shape of yourDAG, you break some of the assumptions around continuity of theschedule. Things like visualizing the tree view or how to perform abackfill becomes unclear and mushy. So if the shape of your DAGchanges radically based on parameters, we consider those to bedifferent DAGs, and you generate each one in your pipeline file.

完全删掉某个DAG的信息

supervisord自动管理进程

在特定情况下,修改DAG后,为了避免当前日期之前任务的运行,可以使用填补特定时间段的任务

端口转发

之前的配置都是在内网服务器进行的,但内网服务器只开放了SSH端口22,因此我尝试在另外一台电脑上使用相同的配置,然后设置端口转发,把外网服务器的rabbitmq的5672端口映射到内网服务器的对应端口,然后启动airflow连接。

上一条命令表示的格式为

表示hostname的port

-v: 在测试时打开

-4: 出现错误”bind: Cannot assign requested address”时,force thessh client to use ipv4

若出现”Warning: remote port forwarding failed for listen port 52698”,关掉其它的ssh tunnel。

不同机器使用airflow

在外网服务器(用做任务分发服务器)配置与内网服务器相同的airflow模块

使用前述的端口转发以便外网服务器绕过内网服务器的防火墙访问端口。

在外网服务器启动 airflow , 在内网服务器启动 发现任务执行状态丢失。继续学习Celery,以解决此问题。

任务未按预期运行可能的原因

检查 和是否在合适的时间范围内

检查 , 和的输出,有没有某个任务运行异常

检查airflow配置路径中文件夹下的日志输出

若以上都没有问题,则考虑数据冲突,解决方式包括清空数据库或着给当前一个新的

Login in mysql and execute

问题解决

When running get error like “You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘(6) NULL’ at line 1” ) [SQL: u’ALTER TABLE dag MODIFY last_scheduler_run DATETIME(6) NULL’

Install mysql5.7, clicke here for ref.

Operator importing

is no longer supported;

to

ReferencesOriginal link

精品回顾

(错误矫正基金:如果您在阅读过程中发现文字或命令错误,请留言或加小编微信指出,获取红包或累积奖励。希望大家多监督,反馈。适用于所有原创文章。)

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180205G02KNN00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券