airflow—执行器CeleryExecutor(3)

CeleryExecutor可用于正式环境,使用 Celery 作为Task执行的引擎, 扩展性很好。这里使用rabbitmq作为celery的消息存储。

安装

在机器A和机器B上安装airflow

pip2 install airflow[celery] 
pip2 install airflow[rabbitmq] 

注意:最新版本的celery(4.0.2)可能与rabbitmq的管理端不兼容,如果在rabbitmq管理端或用命令行工具显示多列时,报错如下

{error,{exit,{ucs,{bad_utf8_character_code}},
 [{xmerl_ucs,from_utf8,1,[{file,"xmerl_ucs.erl"},{line,185}]},
 {mochijson2,json_encode_string,2,[]},
 {mochijson2,'-json_encode_proplist/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_proplist,2,[]},
 {mochijson2,'-json_encode_array/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_array,2,[]}]}}

可以安装最新的celery-3.x.x的版本

 pip2 -U install celery==3.1.24

配置

设置executor

 # The executor class that airflow should use. Choices include
 # SequentialExecutor, LocalExecutor, CeleryExecutor
 #executor = SequentialExecutor
 executor = CeleryExecutor

设置broker_url

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = amqp://username:passowrd@201401@host:5672/project

设置celery_result_backend

# Another key Celery setting
celery_result_backend = db+mysql://user:passowrd@host:port/airflow

启动

启动Workder

airflow worker -D

启动scheduler

airflow scheduler -D

增加一个DAG

将airflow例子example_bash_operator中的 schedule_interval 改为@once

dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    #schedule_interval='0 0 * * *',
    schedule_interval="@once",
    dagrun_timeout=timedelta(minutes=60))

另存为文件 example_bash_operator.py

分别上传到机器A和机器B的dags_folder目录

$ tree
.
`-- airflow
 |-- airflow-scheduler.err
 |-- airflow-scheduler.log
 |-- airflow-scheduler.out
 |-- airflow.cfg
 |-- dags
 | |-- example_bash_operator.py

启动DAG

airflow trigger_dag example_bash_operator

查看业务日志

查看DAG任务

$ airflow list_tasks example_bash_operator 
also_run_this
run_after_loop
run_this_last
runme_0
runme_1
runme_2

机器A执行日志如下

$ tree
.
|-- also_run_this
| `-- 2017-04-13T20:42:35
|-- run_this_last
| `-- 2017-04-13T20:42:35
`-- runme_1
    `-- 2017-04-13T20:42:35

机器B执行日志如下

$ tree
.
|-- run_after_loop
| `-- 2017-04-13T20:42:35
|-- runme_0
| `-- 2017-04-13T20:42:35
`-- runme_2
   `-- 2017-04-13T20:42:35

从上面的日志文件可以看出,这个DAG的6个任务被分发到两台机器执行,每台机器执行3个任务。

业务日志的集中存储

airflow的log日志默认存储在文件中,也可以远程存储,配置如下

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =

也可以通过logstach将日志搜集到Elasticsearch中存储

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏hbbliyong

win8快捷键大全分享,非常全

Windows 8全新的Metro操作体验,对于没有平板只能用快捷键来提高效率了。全面的Windows 8快捷键,请下载微软官方的Windows 8快捷键表格,...

2654
来自专栏PHP技术

Apache的httpd.conf文件配置详解

Apache的基本设置主要交由httpd.conf来设定管理,我们要修改Apache的相关设定,主要还是通过修改httpd.cong来实现。下面让我们来看看ht...

3218
来自专栏代码GG之家

SDL系列讲解(十) 按键处理流程

SDL系列讲解(一) 简介 SDL系列讲解(二) 环境搭建 SDL系列讲解(三) 工具安装 SDL是什么,能干什么,为什么我们要学习它? SDL系列讲解(四)...

1908
来自专栏python学习之旅

Python+Selenium笔记(二):配置谷歌+IE环境

#有的时候可能要访问外国的网站下载资料或工具,这时可能出现各种问题,例如谷歌人机验证显示不了、网站打不开等,建议使用一个FQ软件 下载免费版的就行了,土豪请随...

41111
来自专栏CodingToDie

Python学习(四):番外片

第4 章 附加篇 Table of Contents Python安装文件 IDLE Python 3.6 Python 3.6 Manuals(64-bit)...

3227
来自专栏北京马哥教育

11条nginx优化方法助力你的运维生涯

云豆贴心提醒,本文阅读时间5分钟 隐藏nginx header里版本号信息 1.查看版本号 ? 2.隐藏版本号 在nginx配置文件的http标签内加入“s...

3959
来自专栏CDA数据分析师

Excel简化办公系列之四 | 盘点Excel中那些少有人知道却实用的功能

本文为CDA作者青菜原创文章,转载请注明来源 编者按:CDA作者青菜将在近期发布「Excel简化办公」系列文章,本文是第四篇;更多精彩请持续关注~ 1.恢复未保...

18510
来自专栏Petrichor的专栏

车载网络: OMNeT++安装CAN协议

首先要保证已安装好 OMNeT++。具体安装教程可参照 Ubuntu: 安装 OMNeT++ 仿真工具 。

956
来自专栏Laoqi's Linux运维专列

Kubernetes 1.8.6 集群部署–钉钉报警(十二)

1053
来自专栏惨绿少年

使用cobbler批量安装操作系统(基于Centos7.x )

1.1 cobbler简介   Cobbler是一个Linux服务器安装的服务,可以通过网络启动(PXE)的方式来快速安装、重装物理服务器和虚拟机,同时还可以管...

3120

扫码关注云+社区