airflow—执行器CeleryExecutor(3)

CeleryExecutor可用于正式环境,使用 Celery 作为Task执行的引擎, 扩展性很好。这里使用rabbitmq作为celery的消息存储。

安装

在机器A和机器B上安装airflow

pip2 install airflow[celery] 
pip2 install airflow[rabbitmq] 

注意:最新版本的celery(4.0.2)可能与rabbitmq的管理端不兼容,如果在rabbitmq管理端或用命令行工具显示多列时,报错如下

{error,{exit,{ucs,{bad_utf8_character_code}},
 [{xmerl_ucs,from_utf8,1,[{file,"xmerl_ucs.erl"},{line,185}]},
 {mochijson2,json_encode_string,2,[]},
 {mochijson2,'-json_encode_proplist/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_proplist,2,[]},
 {mochijson2,'-json_encode_array/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_array,2,[]}]}}

可以安装最新的celery-3.x.x的版本

 pip2 -U install celery==3.1.24

配置

设置executor

 # The executor class that airflow should use. Choices include
 # SequentialExecutor, LocalExecutor, CeleryExecutor
 #executor = SequentialExecutor
 executor = CeleryExecutor

设置broker_url

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = amqp://username:passowrd@201401@host:5672/project

设置celery_result_backend

# Another key Celery setting
celery_result_backend = db+mysql://user:passowrd@host:port/airflow

启动

启动Workder

airflow worker -D

启动scheduler

airflow scheduler -D

增加一个DAG

将airflow例子example_bash_operator中的 schedule_interval 改为@once

dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    #schedule_interval='0 0 * * *',
    schedule_interval="@once",
    dagrun_timeout=timedelta(minutes=60))

另存为文件 example_bash_operator.py

分别上传到机器A和机器B的dags_folder目录

$ tree
.
`-- airflow
 |-- airflow-scheduler.err
 |-- airflow-scheduler.log
 |-- airflow-scheduler.out
 |-- airflow.cfg
 |-- dags
 | |-- example_bash_operator.py

启动DAG

airflow trigger_dag example_bash_operator

查看业务日志

查看DAG任务

$ airflow list_tasks example_bash_operator 
also_run_this
run_after_loop
run_this_last
runme_0
runme_1
runme_2

机器A执行日志如下

$ tree
.
|-- also_run_this
| `-- 2017-04-13T20:42:35
|-- run_this_last
| `-- 2017-04-13T20:42:35
`-- runme_1
    `-- 2017-04-13T20:42:35

机器B执行日志如下

$ tree
.
|-- run_after_loop
| `-- 2017-04-13T20:42:35
|-- runme_0
| `-- 2017-04-13T20:42:35
`-- runme_2
   `-- 2017-04-13T20:42:35

从上面的日志文件可以看出,这个DAG的6个任务被分发到两台机器执行,每台机器执行3个任务。

业务日志的集中存储

airflow的log日志默认存储在文件中,也可以远程存储,配置如下

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =

也可以通过logstach将日志搜集到Elasticsearch中存储

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Mac端Git安装以及环境搭建

直接下载安装包,Git下载地址 https://git-scm.com/download

333
来自专栏Snova云数仓

Snova添加子用户及策略操作指南1

在用户列表中,选择需要授权的子用户。关联snova相关读写权限。策略关联成功后,子用户即获取相关资源权限。

1137
来自专栏禹都一只猫博客

docker入门:安装以及基本的命令

1935
来自专栏Java Edge

MacOS 下使用 intellij IDEA 将git上传项目到 Github

1334
来自专栏左瞅瞅,右瞅瞅

zabbix上线之路(四)——监控mysql

192.168.12.74 linux-node1.example.com (zabbix,监控自己的mysql)

1294
来自专栏增长技术

Git目录与工作目录

‘Git目录’是为你的项目存储所有历史和元信息的目录–包括所有的对象(commits,trees,blobs,tags) 这些对象指向不同的分支。

1162
来自专栏别先生

从gitlab下载下来的maven无法运行,老报404解决方法

1:由于在不同的地方开发同一个项目,就将项目上传到gitlab上面(可以创建私有项目,免费的),回到住的地方,项目下载下来老报错,自己摸索的,记录一下。自己记得...

3225
来自专栏MySQL实战分享

MongoDB 第一期 :集群搭建

MongoDB集群的搭建不算困难,只需要按部就班的配置,几分钟完成一个能用的Mongo集群绝对是有可能的,但是要搭建一个高可用的优质集群,那么需要研究的还远不止...

3892
来自专栏java闲聊

多个Mysql安装

1665
来自专栏开源优测

AutoLink开源自动化测试集成解决方案

722

扫码关注云+社区