流处理框架中的反压(back pressure)机制

流处理系统通常需要优雅地处理反压(back pressure)问题。反压通常产生是由于短时间内负载高峰导致系统接收数据的速率远高于它处理数据的速率。比如,垃圾回收停顿可能导致流入的数据快速堆积,后者双十一等造成流量陡增。反压如果不能够得到很好地处理,可能会导致资源好近甚至系统崩溃。

目前主流的流处理框架Storm、JStorm、Spark Streaming以及Flink等都提供了反压机制,各自的侧重点和实现都不相同。

1、Storm如何处理反压问题

对于开启了acker机制的Storm程序,可以通过设置conf.setMaxSpoutPending参数来实现反压效果,如果下游bolt处理速度跟不上导致spout发送的tuple没有及时确认的数量超过了参数设定的值,spout就会停止发送数据。

这种简单粗暴的方式主要有以下几个缺点:

  • 很难调优conf.setMaxSpoutPending参数的设置来达到最好的反压效果,如果参数小则会导致吞吐上不去;如果参数设置大了则会导致work进程OOM
  • 下游bolt出现阻塞,上游停止发送,下游消除阻塞后,上游又进行开闸放水,过一会儿,下游又阻塞,上游又限流,如此反复,会导致整个系统数据流一直处在一个颠簸的状态
  • 对于关闭acker机制的Storm程序无效

Storm1.0中版本中使用了新的自动反压机制,社区解决方案如下:

反压过程:

  • worker executor的接收队列大于高水位,通知反压线程
  • worker反压线程通知zookeeper,将反压信息写入到zookeeper节点
  • zookeeper通知该topo上所有的worker进入反压状态
  • spout降低发送tuple的速率

2、JStorm如何处理反压问题

JStorm的限流机制,当下游bolt发生阻塞的时候,并且阻塞task比例超过某个比例的时候,会触发 启动反压限流。

其中判断bolt是否发生阻塞是通过连续n次采样周其中,队列超过某个阈值,就认为该task处于阻塞状态。

根据阻塞的component,进行反向DAG推算,直到推算到源头spout,将topology的一个状态位设置为 “限流状态”。

task出现阻塞时,将自己的执行线程时间传递给TM(topology master),当启动反向限流后,TM把这个执行时间传递给spout。这样spout每次发送一个tuple,就会等待这个执行时间。而当spout降速之后,发送过阻塞命令的task检查队列水位是否连续n次低于某个阈值,如果是,就会发送解除限流命令给TM,TM然后发送提速命令给所有的spout,这样spout每次发送一个tuple就会减少等待时间,当spout的等待时间降为0,spout就会不断地向TM发送解除限速给TM,当所有降速的spout都发了解除限速命令,那么就会将topology的状态设置为正常,标志真正解除限速。

配置示例如下所示:

## 反压总开关
topology.backpressure.enable: true
## 高水位 -- 当队列使用量超过这个值时,认为阻塞
topology.backpressure.water.mark.high: 0.8
## 低水位 -- 当队列使用量低于这个量时, 认为可以解除阻塞
topology.backpressure.water.mark.low: 0.05
## 阻塞比例 -- 当阻塞task数/这个component并发 的比例高于这值时,触发反压
topology.backpressure.coordinator.trigger.ratio: 0.1

## 反压采样周期, 单位ms
topology.backpressure.check.interval: 1000
## 采样次数和采样比例, 即在连续4次采样中, 超过(不包含)(4 * 0.75)次阻塞才能认为真正阻塞, 超过(不包含)(4 * 0.75)次解除阻塞才能认为是真正解除阻塞
topology.backpressure.trigger.sample.rate: 0.75
topology.backpressure.trigger.sample.number: 4

3、Spark Streaming中如何处理反压问题

Spark Streaming程序中当计算过程中出现batch processing time 大于 batch interval的情况时,(其中batch processing time为实际计算一个批次花费时间,batch interval为Streaming应用设置的批处理间隔),意味着处理数据的速度小于接收数据的速度,如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟),可以通过设置参数spark.streaming.receiver.maxRate来限制Receiver的数据接收速率,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。

Spark Streaming反压过程主要是根据JobSchedule反馈作业的执行信息来估算当前的最大处理速度(rate),然后动态地调整Receiver数据接收率。

反压执行过程主要分为两部分:BatchCompleted事件触发 以及 BatchCompleted事件处理

BatchCompleted事件触发:

  • 每当一个Job执行完成时会向eventLoop发送一个JobCompleted事件
  • EventLoop事件处理器接收到JobCompleted事件之后将调用handleJobCompletion来处理Job完成事件
  • handleJobCompletion使用Job执行的信息(delay)创建StreamingListenerBatchCompleted事件并通过StreamingListenerBus向监听器发送

BatchCompleted事件处理:

  • SteamingListenerBus将时间转交给具体的SteamingListener,即RateControlle
  • RateController接收到BatchCompleted事件之后调用onBatchCompleted进行处理
  • onBatchCompleted从Job的完成信息中抽取任务的执行延迟和调度延迟,然后断地给RateEstimator计算出一个新的rate并发布
  • 通过ReceiverTracker将新生成的rate包装成UpdateReceiverRateLimit事件转交给ReceiverTrackerEndpoint
  • ReceiverTrackerEndpoint接收到消息后,查询注册时的ReceiverSupvisorImpl,再将rate包装成UpdateLimit发送到endpoint
  • endpoint接收到消息后,使用updateRate更新BlockGenerators,同时计算出一个固定的令牌间隔

以上两个过程便将反压机制中最重要的rate调整完成。

当Receiver开始接收数据的时候,需要获取令牌才能够将数据存放入currentBuffer,否则的话将被阻塞,进而阻塞Receiver从数据源拉取数据。其中令牌投放采用令牌桶机制(参考下图),固定大小的令牌桶根据rate源源不断地产生令牌,如果令牌不消耗,或消耗的速度小于产生的速度,令牌就会不断的增多,直到把桶撑满。后面再产生的令牌就会被丢弃。

4、Flink中如何处理反压问题

Flink 在运行时主要由 operators 和 streams 两大组件构成。每个 operator 会消费中间态的流,并在流上进行转换,然后生成新的流。在 Flink 中,这些逻辑流就好比是分布式阻塞队列,而队列容量是通过缓冲池(LocalBufferPool)来实现的。每个被生产和被消费的流都会被分配一个缓冲池。缓冲池管理着一组缓冲(Buffer),缓冲在被消费后可以被回收循环利用。

上图展示的是两个task之间的数据传输:

  1. 记录"A"进入了Flink并且被Task 1处理(省略中间的一些反序列化、Netty接收过程)
  2. 记录别序列化到buffer中(LocalBufferPool1中有空间存储的buffer)
  3. buffer被送到Task 2中从这个buffer中读出记录(LocalBufferLocal2中有空间接收的buffer)

数据传输有两个场景:

本地传输:如果Task 1和Task 2在同一个worker即诶但,buffer可以直接传递给下一个Task。Task 2消费了该buffer,那么就会被LocalBufferPool1回收。如果Task 2消费的速度比Task 1取buffer的速度,导师LocalBufferPool1无可用的buffer,Task1等待在可用的buffer上。最终导致Task1的降速。

网络传输:如果 Task 1 和 Task 2 运行在不同的 worker 节点上,那么 buffer 会在发送到网络(TCP Channel)后被回收。在接收端,会从 LocalBufferPool 中申请 buffer,然后拷贝网络中的数据到 buffer 中。如果没有可用的 buffer,会停止从 TCP 连接中读取数据。在输出端,通过 Netty 的水位值机制来保证不往网络中写入太多数据。如果网络中的数据(Netty输出缓冲中的字节数)超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收端停止消费网络中的数据(由于接收端缓冲池没有可用 buffer),网络中的缓冲数据就会堆积,那么发送端也会暂停发送。另外,这会使得发送端的缓冲池得不到回收,writer 阻塞在向 LocalBufferPool 请求 buffer,阻塞了 writer 往 ResultSubPartition 写数据。

通过固定大小的缓冲池,保证了Flink有一套健壮的反压机制,使得Task生产数据的速度不会快于消费的速度。

参考

http://blog.csdn.net/cm_chenmin/article/details/52936575

https://github.com/alibaba/jstorm/wiki/Backpressure

http://www.jianshu.com/p/906db0d86653

http://www.cnblogs.com/barrenlake/p/5349949.html

http://data-artisans.com/how-flink-handles-backpressure/

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java工程师日常干货

分布式利器Zookeeper(二):分布式锁原生API操作ZK Watch机制 分布式锁思路

在《分布式利器Zookeeper(一)》中对ZK进行了初步的介绍以及搭建ZK集群环境,本篇博客将涉及的话题是:基于原生API方式操作ZK,Watch机制,分布式...

993
来自专栏Java学习网

大型网站架构系列:消息队列

大型网站架构系列:消息队列 一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最...

3349
来自专栏linux驱动个人学习

CPUFreq驱动

CPUFreq子系统位于 drivers/cpufreq目录下,负责进行运行过程中CPU频率和电压的动态调整,即DvFS( Dynamic Voltage Fr...

923
来自专栏MessageQueue

Broker模块划分

Broker需要和NameServer及Client通信,包括Broker之间也需要通信(主从结构),所以Broker会有一个模块(Net&PacketHand...

982
来自专栏即时通讯技术

一套高可用、易伸缩、高并发的IM群聊架构方案设计实践

本文原题为“一套高可用群聊消息系统实现”,由作者“于雨氏”授权整理和发布,内容有些许改动,作者博客地址:alexstocks.github.io。应作者要求,...

882
来自专栏Spark学习技巧

Kafka源码系列之源码分析zookeeper在kafka的作用

浪尖的kafka源码系列以kafka0.8.2.2源码为例给大家进行讲解的。纯属个人爱好,希望大家对不足之处批评指正。 一,zookeeper在分布式集群的作...

29810
来自专栏PHP技术

Redis 和 Memcached 的区别

说到redis就会联想到memcached,反之亦然。了解过两者的同学有那么个大致的印象:redis与memcached相比,比仅支持简单的key-value数...

3326
来自专栏Linyb极客之路

消息队列使用的四种场景介绍

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题

1042
来自专栏北京马哥教育

大型网站架构系列:消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka)。 【第二篇...

2915
来自专栏云霄雨霁

概念题知识点总结

1640

扫码关注云+社区