之前介绍过的 apache-airflow 系列文章
介绍了如何安装、配置、及使用,本文介绍如何如何部署一个健壮的 apache-airflow 调度系统 - 集群部署。
本文主要介绍以下几点:
集群部署将为您的 apache-airflow 系统带来更多的计算能力和高可用性。
airflow 系统在运行时有许多守护进程,它们提供了 airflow 的全部功能。守护进程包括 Web服务器-webserver、调度程序-scheduler、执行单元-worker、消息队列监控工具-Flower等。下面是 apache-airflow 集群、高可用部署的主要守护进程。
webserver 是一个守护进程,它接受 HTTP 请求,允许您通过 Python Flask Web 应用程序与 airflow 进行交互,webserver 提供以下功能:
webserver 守护进程使用 gunicorn 服务器(相当于 java 中的 tomcat )处理并发请求,可通过修改{AIRFLOW_HOME}/airflow.cfg文件中 workers 的值来控制处理并发请求的进程数。 例如:
workers = 4 #表示开启4个gunicorn worker(进程)处理web请求
启动 webserver 守护进程:
$ airfow webserver -D
scheduler 是一个守护进程,它周期性地轮询任务的调度计划,以确定是否触发任务执行。 启动的 scheduler 守护进程:
$ airfow scheduler -D
worker 是一个守护进程,它启动 1 个或多个 Celery 的任务队列,负责执行具体 的 DAG 任务。
当设置 airflow 的 executors 设置为 CeleryExecutor 时才需要开启 worker 守护进程。推荐您在生产环境使用 CeleryExecutor :
executor = CeleryExecutor
启动一个 worker守护进程,默认的队列名为 default:
$ airfow worker -D
flower 是一个守护进程,用于是监控 celery 消息队列。启动守护进程命令如下:
$ airflow flower -D
` 默认的端口为 5555,您可以在浏览器地址栏中输入 "http://hostip:5555" 来访问 flower ,对 celery 消息队列进行监控。
需要注意的是 airflow 的守护进程彼此之间是独立的,他们并不相互依赖,也不相互感知。每个守护进程在运行时只处理分配到自己身上的任务,他们在一起运行时,提供了 airflow 的全部功能。
将以所有上守护进程运行在同一台机器上即可完成 airflow 的单结点部署,架构如下图所示
airflow 单节点部署
在稳定性要求较高的场景,如金融交易系统中,一般采用集群、高可用的方式来部署。Apache Airflow 同样支持集群、高可用的部署,airflow 的守护进程可分布在多台机器上运行,架构如下图所示:
airflow 集群部署
如果一个 worker 节点崩溃或离线时,集群仍可以被控制的,其他 worker 节点的任务仍会被执行。
如果您的工作流中有一些内存密集型的任务,任务最好是分布在多台机器上运行以便得到更快的执行。
celeryd_concurrency = 30
您可以根据实际情况,如集群上运行的任务性质,CPU 的内核数量等,增加并发进程的数量以满足实际需求。
您还可以向集群中添加更多主节点,以扩展主节点上运行的服务。您可以扩展 webserver 守护进程,以防止太多的 HTTP 请求出现在一台机器上,或者您想为 webserver 的服务提供更高的可用性。需要注意的一点是,每次只能运行一个 scheduler 守护进程。如果您有多个 scheduler 运行,那么就有可能一个任务被执行多次。这可能会导致您的工作流因重复运行而出现一些问题。 下图为扩展 Master 节点的架构图:
扩展 Master 节点
看到这里,可能有人会问,scheduler 不能同时运行两个,那么运行 scheduler 的节点一旦出了问题,任务不就完全不运行了吗?
答案: 这是个非常好的问题,不过已经有解决方案了,我们可以在两台机器上部署 scheduler ,只运行一台机器上的 scheduler 守护进程 ,一旦运行 scheduler 守护进程的机器出现故障,立刻启动另一台机器上的 scheduler 即可。我们可以借助第三方组件 airflow-scheduler-failover-controller 实现 scheduler 的高可用。 具体步骤如下所示:
1. 下载 failover
gitclone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
2. 使用 pip 进行安装
cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}
pipinstall -e .
3. 初始化 failover
scheduler_failover_controllerinit
注:初始化时,会向airflow.cfg中追加内容,因此需要先安装 airflow 并初始化。
4. 更改 failover 配置
scheduler_nodes_in_cluster= host1,host2
注:host name 可以通过scheduler_failover_controller get_current_host命令获得
5. 配置安装 failover 的机器之间的免密登录,配置完成后,可以使用如下命令进行验证:
scheduler_failover_controller test_connection
6. 启动 failover
scheduler_failover_controllerstart
1. 节点运行的守护进程如下:
2. 队列服务处于运行中. (RabbitMQ, Redis, etc)
executor = CeleryExecutor
sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
如果使用 RabbitMQ
broker_url = amqp://guest:guest@{RABBITMQ_HOST}:5672/
如果使用 Redis
broker_url = redis://{REDIS_HOST}:6379/0 #使用数据库 0
celery_result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow #当然您也可以使用 Redis :celery_result_backend =redis://{REDIS_HOST}:6379/1
$ airflow initdb
$ airflow webserver
$ airflow scheduler
$ airflow webserver
$ airflow worker
至此,所有均已集群或高可用部署,apache-airflow 系统已坚不可摧。
官方文档如下: Documentation: https://airflow.incubator.apache.org/ Install Documentation: https://airflow.incubator.apache.org/installation.html GitHub Repo: https://github.com/apache/incubator-airflow
(完)