前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >airflow—执行器CeleryExecutor(3)

airflow—执行器CeleryExecutor(3)

原创
作者头像
刘远
修改2017-12-29 10:53:38
修改2017-12-29 10:53:38
4.2K00
代码可运行
举报
文章被收录于专栏:刘远的专栏刘远的专栏
运行总次数:0
代码可运行

CeleryExecutor可用于正式环境,使用 Celery 作为Task执行的引擎, 扩展性很好。这里使用rabbitmq作为celery的消息存储。

安装

在机器A和机器B上安装airflow

代码语言:bash
复制
pip2 install airflow[celery] 
pip2 install airflow[rabbitmq] 

注意:最新版本的celery(4.0.2)可能与rabbitmq的管理端不兼容,如果在rabbitmq管理端或用命令行工具显示多列时,报错如下

代码语言:bash
复制
{error,{exit,{ucs,{bad_utf8_character_code}},
 [{xmerl_ucs,from_utf8,1,[{file,"xmerl_ucs.erl"},{line,185}]},
 {mochijson2,json_encode_string,2,[]},
 {mochijson2,'-json_encode_proplist/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_proplist,2,[]},
 {mochijson2,'-json_encode_array/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_array,2,[]}]}}

可以安装最新的celery-3.x.x的版本

代码语言:bash
复制
 pip2 -U install celery==3.1.24

配置

设置executor

代码语言:bash
复制
 # The executor class that airflow should use. Choices include
 # SequentialExecutor, LocalExecutor, CeleryExecutor
 #executor = SequentialExecutor
 executor = CeleryExecutor

设置broker_url

代码语言:bash
复制
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = amqp://username:passowrd@201401@host:5672/project

设置celery_result_backend

代码语言:bash
复制
# Another key Celery setting
celery_result_backend = db+mysql://user:passowrd@host:port/airflow

启动

启动Workder

代码语言:bash
复制
airflow worker -D

启动scheduler

代码语言:bash
复制
airflow scheduler -D

增加一个DAG

将airflow例子example_bash_operator中的 schedule_interval 改为@once

代码语言:python
代码运行次数:0
运行
复制
dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    #schedule_interval='0 0 * * *',
    schedule_interval="@once",
    dagrun_timeout=timedelta(minutes=60))

另存为文件 example_bash_operator.py

分别上传到机器A和机器B的dags_folder目录

代码语言:bash
复制
$ tree
.
`-- airflow
 |-- airflow-scheduler.err
 |-- airflow-scheduler.log
 |-- airflow-scheduler.out
 |-- airflow.cfg
 |-- dags
 | |-- example_bash_operator.py

启动DAG

代码语言:bash
复制
airflow trigger_dag example_bash_operator

查看业务日志

查看DAG任务

代码语言:bash
复制
$ airflow list_tasks example_bash_operator 
also_run_this
run_after_loop
run_this_last
runme_0
runme_1
runme_2

机器A执行日志如下

代码语言:bash
复制
$ tree
.
|-- also_run_this
| `-- 2017-04-13T20:42:35
|-- run_this_last
| `-- 2017-04-13T20:42:35
`-- runme_1
    `-- 2017-04-13T20:42:35

机器B执行日志如下

代码语言:bash
复制
$ tree
.
|-- run_after_loop
| `-- 2017-04-13T20:42:35
|-- runme_0
| `-- 2017-04-13T20:42:35
`-- runme_2
   `-- 2017-04-13T20:42:35

从上面的日志文件可以看出,这个DAG的6个任务被分发到两台机器执行,每台机器执行3个任务。

业务日志的集中存储

airflow的log日志默认存储在文件中,也可以远程存储,配置如下

代码语言:bash
复制
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =

也可以通过logstach将日志搜集到Elasticsearch中存储

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装
  • 配置
    • 设置executor
    • 设置broker_url
    • 设置celery_result_backend
  • 启动
    • 启动Workder
    • 启动scheduler
  • 增加一个DAG
  • 启动DAG
  • 查看业务日志
  • 业务日志的集中存储
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档