【 airflow 实战系列】 基于 python 的调度和监控工作流的平台

简介

airflow 是一个使用 python 语言编写的 data pipeline 调度和监控工作流的平台。Airflow 被 Airbnb 内部用来创建、监控和调整数据管道。任何工作流都可以在这个使用 Python 来编写的平台上运行。

Airflow 是一种允许工作流开发人员轻松创建、维护和周期性地调度运行工作流(即有向无环图或成为 DAGs )的工具。在 Airbnb 中,这些工作流包括了如数据存储、增长分析、Email 发送、A/B 测试等等这些跨越多部门的用例。

这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres 和 S3 交互的能力,并且提供了钩子使得系统拥有很好地扩展性。除了一个命令行界面,该工具还提供了一个基于 Web 的用户界面让您可以可视化管道的依赖关系、监控进度、触发任务等。

传统 Workflow 通常使用 TextFiles ( json,xml/etc ) 来定义 DAG ,然后 Scheduler 解析这些 DAG 文件形成具体的 TaskObjec t执行; Airflow 没这么干,它直接用 Python 写 DAGdefinition ,一下子突破了文本文件表达能力的局限,定义 DAG 变得简单。

Airflow 的架构

在一个可扩展的生产环境中,Airflow 含有以下组件:

  • 一个元数据库(MySQL 或 Postgres)
  • 一组 Airflow 工作节点
  • 一个调节器(Redis 或 RabbitMQ)
  • 一个 Airflow Web 服务器

所有这些组件可以在一个机器上随意扩展运行。如果使用 LocalExcuter 来适度的安装则可以获得相当多的额外性能。

优点

  • python 脚本实现 DAG ,非常容易扩展
  • 工作流依赖可视化
  • no XML
  • 可测试
  • 可作为 crontab 的替代
  • 可实现复杂的依赖规则
  • Pools
  • CLI 和 Web UI

功能简介

常见命令

  • initdb,初始化元数据 DB,元数据包括了 DAG 本身的信息、运行信息等;
  • resetdb,清空元数据 DB;
  • list_dags,列出所有 DAG;
  • list_tasks,列出某 DAG 的所有 task ;
  • test,测试某 task 的运行状况;
  • backfill,测试某 DAG 在设定的日期区间的运行状况;
  • webserver,开启 webserver 服务;
  • scheduler,用于监控与触发 DAG 。

ETL

ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库。

Airflow 设计时,只是为了很好的处理 ETL 任务而已,但是其精良的设计,正好可以用来解决任务的各种依赖问题。

任务依赖

通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。比如:

  • 时间依赖:任务需要等待某一个时间点触发。
  • 外部系统依赖:任务依赖 Mysql 中的数据,HDFS 中的数据等等,这些不同的外部系统需要调用接口去访问。
  • 机器依赖:任务的执行只能在特定的某一台机器的环境中,可能这台机器内存比较大,也可能只有那台机器上有特殊的库文件。
  • 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响。
  • 资源依赖:任务消耗资源非常多,使用同一个资源的任务需要被限制,比如跑个数据转换任务要10个 G,机器一共就30个 G,最多只能跑两个,我希望类似的任务排个队。
  • 权限依赖:某种任务只能由某个权限的用户启动。

也许大家会觉得这些是在任务程序中的逻辑需要处理的部分,但是我认为,这些逻辑可以抽象为任务控制逻辑的部分,和实际任务执行逻辑解耦合。

如何理解 Crontab

现在让我们来看下最常用的依赖管理系统,Crontab。

在各种系统中,总有些定时任务需要处理,每当在这个时候,我们第一个想到的总是crontab。

确实,crontab 可以很好的处理定时执行任务的需求,但是对于 crontab 来说,执行任务,只是调用一个程序如此简单,而程序中的各种逻辑都不属于 crontab 的管辖范围(很好的遵循了 KISS )。

所以我们可以抽象的认为:

crontab 是一种依赖管理系统,而且只管理时间上的依赖。

Airflow的处理依赖的方式

Airflow 的核心概念,是 DAG (有向无环图),DAG 由一个或多个 TASK 组成,而这个 DAG 正是解决了上文所说的任务间依赖。Task A 执行完成后才能执行 Task B,多个Task之间的依赖关系可以很好的用DAG表示完善。

Airflow 完整的支持 crontab 表达式,也支持直接使用 python 的 datatime 表述时间,还可以用 datatime 的 delta 表述时间差。这样可以解决任务的时间依赖问题。

Airflow 在 CeleryExecuter 下可以使用不同的用户启动 Worke r,不同的 Worker 监听不同的 Queue ,这样可以解决用户权限依赖问题。Worker 也可以启动在多个不同的机器上,解决机器依赖的问题。

Airflow 可以为任意一个 Task 指定一个抽象的 Pool,每个 Pool 可以指定一个 Slot 数。每当一个 Task 启动时,就占用一个 Slot ,当 Slot 数占满时,其余的任务就处于等待状态。这样就解决了资源依赖问题。

Airflow 中有 Hook 机制(其实我觉得不应该叫 Hook ),作用时建立一个与外部数据系统之间的连接,比如 Mysql,HDFS,本地文件系统(文件系统也被认为是外部系统)等,通过拓展 Hook 能够接入任意的外部系统的接口进行连接,这样就解决的外部系统依赖问题。

参考

http://wingerted.com/2017/02/20/introduce-to-airflow/ https://www.youtube.com/watch?v=cHATHSB_450 https://www.youtube.com/watch?v=Pr0FrvIIfTU

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

使用IBM云功能构建无服务器应用程序

在 Serverlessconf上,IBM 发布了IBM Cloud Functions的一项新功能(作为一个IBM研究预览展示)。通过使用新工具Compose...

19310
来自专栏惨绿少年

Kubernetes 编排系统

1.1 Kubernetes简介 ? 1.1.1 什么是Kubernetes Kubernetes (通常称为K8s,K8s是将8个字母“ubernete”替换...

3946
来自专栏北京马哥教育

直接Mark!开源的DevOps开发工具箱

DevOps是一组过程、方法与系统的统称,用于促进开发(应用程序/软件工程)、技术运营和质量保障(QA)部门之间的沟通、协作与整合。在DevOps的整个流程中,...

3015
来自专栏编程一生

接口性能优化方案及其理论依据

803
来自专栏彭湖湾的编程世界

【计算机网络】 DNS学习笔记 (>﹏<)

参考书籍 《计算机网络-自顶向下》  作者 James F. Kurose DNS的作用 DNS是因特网的目录服务 DNS是因特网的目录服务,它提供了主机名到I...

23211
来自专栏散尽浮华

Nginx基于TCP/UDP端口的四层负载均衡(stream模块)配置梳理

通过我们会用Nginx的upstream做基于http/https端口的7层负载均衡,由于Nginx老版本不支持tcp协议,所以基于tcp/udp端口的四层负载...

702
来自专栏用户2442861的专栏

Apache运行机制剖析

2、  服务器收到浏览器的请求数据,经过分析处理,向浏览器输出响应数据(Response)。

432
来自专栏腾讯NEXT学位

浅谈vuex应用场景

851
来自专栏大魏分享(微信公众号:david-share)

容器超融合的实现&持久存储的动态分配 : Openshift3.9学习系列第六终结篇

干货巨献:Openshift3.9的网络管理大全.加长篇---Openshift3.9学习系列第二篇

1213
来自专栏知识分享

七,ESP8266-UDP(基于Lua脚本语言)二,ESP8266 GPIO和SPI和定时器和串口

那天朋友问我为什么有UDP Sever 和 UDP Client   ,,我说:每个人想的不一样,设计上不一样...... 既然是面向无连接的,那么模块发数据就...

3927

扫码关注云+社区