airflow—执行器CeleryExecutor(3)

CeleryExecutor可用于正式环境,使用 Celery 作为Task执行的引擎, 扩展性很好。这里使用rabbitmq作为celery的消息存储。

安装

在机器A和机器B上安装airflow

pip2 install airflow[celery] 
pip2 install airflow[rabbitmq] 

注意:最新版本的celery(4.0.2)可能与rabbitmq的管理端不兼容,如果在rabbitmq管理端或用命令行工具显示多列时,报错如下

{error,{exit,{ucs,{bad_utf8_character_code}},
 [{xmerl_ucs,from_utf8,1,[{file,"xmerl_ucs.erl"},{line,185}]},
 {mochijson2,json_encode_string,2,[]},
 {mochijson2,'-json_encode_proplist/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_proplist,2,[]},
 {mochijson2,'-json_encode_array/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_array,2,[]}]}}

可以安装最新的celery-3.x.x的版本

 pip2 -U install celery==3.1.24

配置

设置executor

 # The executor class that airflow should use. Choices include
 # SequentialExecutor, LocalExecutor, CeleryExecutor
 #executor = SequentialExecutor
 executor = CeleryExecutor

设置broker_url

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = amqp://username:passowrd@201401@host:5672/project

设置celery_result_backend

# Another key Celery setting
celery_result_backend = db+mysql://user:passowrd@host:port/airflow

启动

启动Workder

airflow worker -D

启动scheduler

airflow scheduler -D

增加一个DAG

将airflow例子example_bash_operator中的 schedule_interval 改为@once

dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    #schedule_interval='0 0 * * *',
    schedule_interval="@once",
    dagrun_timeout=timedelta(minutes=60))

另存为文件 example_bash_operator.py

分别上传到机器A和机器B的dags_folder目录

$ tree
.
`-- airflow
 |-- airflow-scheduler.err
 |-- airflow-scheduler.log
 |-- airflow-scheduler.out
 |-- airflow.cfg
 |-- dags
 | |-- example_bash_operator.py

启动DAG

airflow trigger_dag example_bash_operator

查看业务日志

查看DAG任务

$ airflow list_tasks example_bash_operator 
also_run_this
run_after_loop
run_this_last
runme_0
runme_1
runme_2

机器A执行日志如下

$ tree
.
|-- also_run_this
| `-- 2017-04-13T20:42:35
|-- run_this_last
| `-- 2017-04-13T20:42:35
`-- runme_1
    `-- 2017-04-13T20:42:35

机器B执行日志如下

$ tree
.
|-- run_after_loop
| `-- 2017-04-13T20:42:35
|-- runme_0
| `-- 2017-04-13T20:42:35
`-- runme_2
   `-- 2017-04-13T20:42:35

从上面的日志文件可以看出,这个DAG的6个任务被分发到两台机器执行,每台机器执行3个任务。

业务日志的集中存储

airflow的log日志默认存储在文件中,也可以远程存储,配置如下

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =

也可以通过logstach将日志搜集到Elasticsearch中存储

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏C#

DotNet加密方式解析--非对称加密

    新年新气象,也希望新年可以挣大钱。不管今年年底会不会跟去年一样,满怀抱负却又壮志未酬。(不过没事,我已为各位卜上一卦,卦象显示各位都能挣钱...)...

6028
来自专栏跟着阿笨一起玩NET

c#实现打印功能

3782
来自专栏一个会写诗的程序员的博客

Spring Reactor 项目核心库Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactiv...

2842
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

3017
来自专栏张善友的专栏

LINQ via C# 系列文章

LINQ via C# Recently I am giving a series of talk on LINQ. the name “LINQ via C...

3025
来自专栏我和未来有约会

Silverlight第三方控件专题

这里我收集整理了目前网上silverlight第三方控件的专题,若果有所遗漏请告知我一下。 名称 简介 截图 telerik 商 RadC...

4425
来自专栏陈仁松博客

ASP.NET Core 'Microsoft.Win32.Registry' 错误修复

今天在发布Asp.net Core应用到Azure的时候出现错误InvalidOperationException: Cannot find compilati...

5248
来自专栏杨龙飞前端

scrollto 到指定位置

2984
来自专栏闻道于事

js登录滑动验证,不滑动无法登陆

js的判断这里是根据滑块的位置进行判断,应该是用一个flag判断 <%@ page language="java" contentType="text/html...

8848
来自专栏转载gongluck的CSDN博客

cocos2dx 打灰机

#include "GamePlane.h" #include "PlaneSprite.h" #include "BulletNode.h" #include...

7306

扫码关注云+社区