前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >airflow—执行器CeleryExecutor(3)

airflow—执行器CeleryExecutor(3)

原创
作者头像
刘远
修改2017-12-29 10:53:38
4K0
修改2017-12-29 10:53:38
举报
文章被收录于专栏:刘远的专栏刘远的专栏

CeleryExecutor可用于正式环境,使用 Celery 作为Task执行的引擎, 扩展性很好。这里使用rabbitmq作为celery的消息存储。

安装

在机器A和机器B上安装airflow

代码语言:bash
复制
pip2 install airflow[celery] 
pip2 install airflow[rabbitmq] 

注意:最新版本的celery(4.0.2)可能与rabbitmq的管理端不兼容,如果在rabbitmq管理端或用命令行工具显示多列时,报错如下

代码语言:bash
复制
{error,{exit,{ucs,{bad_utf8_character_code}},
 [{xmerl_ucs,from_utf8,1,[{file,"xmerl_ucs.erl"},{line,185}]},
 {mochijson2,json_encode_string,2,[]},
 {mochijson2,'-json_encode_proplist/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_proplist,2,[]},
 {mochijson2,'-json_encode_array/2-fun-0-',3,[]},
 {lists,foldl,3,[{file,"lists.erl"},{line,1197}]},
 {mochijson2,json_encode_array,2,[]}]}}

可以安装最新的celery-3.x.x的版本

代码语言:bash
复制
 pip2 -U install celery==3.1.24

配置

设置executor

代码语言:bash
复制
 # The executor class that airflow should use. Choices include
 # SequentialExecutor, LocalExecutor, CeleryExecutor
 #executor = SequentialExecutor
 executor = CeleryExecutor

设置broker_url

代码语言:bash
复制
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = amqp://username:passowrd@201401@host:5672/project

设置celery_result_backend

代码语言:bash
复制
# Another key Celery setting
celery_result_backend = db+mysql://user:passowrd@host:port/airflow

启动

启动Workder

代码语言:bash
复制
airflow worker -D

启动scheduler

代码语言:bash
复制
airflow scheduler -D

增加一个DAG

将airflow例子example_bash_operator中的 schedule_interval 改为@once

代码语言:python
代码运行次数:0
复制
dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    #schedule_interval='0 0 * * *',
    schedule_interval="@once",
    dagrun_timeout=timedelta(minutes=60))

另存为文件 example_bash_operator.py

分别上传到机器A和机器B的dags_folder目录

代码语言:bash
复制
$ tree
.
`-- airflow
 |-- airflow-scheduler.err
 |-- airflow-scheduler.log
 |-- airflow-scheduler.out
 |-- airflow.cfg
 |-- dags
 | |-- example_bash_operator.py

启动DAG

代码语言:bash
复制
airflow trigger_dag example_bash_operator

查看业务日志

查看DAG任务

代码语言:bash
复制
$ airflow list_tasks example_bash_operator 
also_run_this
run_after_loop
run_this_last
runme_0
runme_1
runme_2

机器A执行日志如下

代码语言:bash
复制
$ tree
.
|-- also_run_this
| `-- 2017-04-13T20:42:35
|-- run_this_last
| `-- 2017-04-13T20:42:35
`-- runme_1
    `-- 2017-04-13T20:42:35

机器B执行日志如下

代码语言:bash
复制
$ tree
.
|-- run_after_loop
| `-- 2017-04-13T20:42:35
|-- runme_0
| `-- 2017-04-13T20:42:35
`-- runme_2
   `-- 2017-04-13T20:42:35

从上面的日志文件可以看出,这个DAG的6个任务被分发到两台机器执行,每台机器执行3个任务。

业务日志的集中存储

airflow的log日志默认存储在文件中,也可以远程存储,配置如下

代码语言:bash
复制
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =

也可以通过logstach将日志搜集到Elasticsearch中存储

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装
  • 配置
    • 设置executor
      • 设置broker_url
        • 设置celery_result_backend
        • 启动
          • 启动Workder
            • 启动scheduler
            • 增加一个DAG
            • 启动DAG
            • 查看业务日志
            • 业务日志的集中存储
            相关产品与服务
            对象存储
            对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档