首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >流处理框架中的反压(back pressure)机制

流处理框架中的反压(back pressure)机制

原创
作者头像
chain
发布2018-06-12 23:27:22
3.8K0
发布2018-06-12 23:27:22
举报

流处理系统通常需要优雅地处理反压(back pressure)问题。反压通常产生是由于短时间内负载高峰导致系统接收数据的速率远高于它处理数据的速率。比如,垃圾回收停顿可能导致流入的数据快速堆积,后者双十一等造成流量陡增。反压如果不能够得到很好地处理,可能会导致资源好近甚至系统崩溃。

目前主流的流处理框架Storm、JStorm、Spark Streaming以及Flink等都提供了反压机制,各自的侧重点和实现都不相同。

1、Storm如何处理反压问题

对于开启了acker机制的Storm程序,可以通过设置conf.setMaxSpoutPending参数来实现反压效果,如果下游bolt处理速度跟不上导致spout发送的tuple没有及时确认的数量超过了参数设定的值,spout就会停止发送数据。

这种简单粗暴的方式主要有以下几个缺点:

  • 很难调优conf.setMaxSpoutPending参数的设置来达到最好的反压效果,如果参数小则会导致吞吐上不去;如果参数设置大了则会导致work进程OOM
  • 下游bolt出现阻塞,上游停止发送,下游消除阻塞后,上游又进行开闸放水,过一会儿,下游又阻塞,上游又限流,如此反复,会导致整个系统数据流一直处在一个颠簸的状态
  • 对于关闭acker机制的Storm程序无效

Storm1.0中版本中使用了新的自动反压机制,社区解决方案如下:

反压过程:

  • worker executor的接收队列大于高水位,通知反压线程
  • worker反压线程通知zookeeper,将反压信息写入到zookeeper节点
  • zookeeper通知该topo上所有的worker进入反压状态
  • spout降低发送tuple的速率

2、JStorm如何处理反压问题

JStorm的限流机制,当下游bolt发生阻塞的时候,并且阻塞task比例超过某个比例的时候,会触发 启动反压限流。

其中判断bolt是否发生阻塞是通过连续n次采样周其中,队列超过某个阈值,就认为该task处于阻塞状态。

根据阻塞的component,进行反向DAG推算,直到推算到源头spout,将topology的一个状态位设置为 “限流状态”。

task出现阻塞时,将自己的执行线程时间传递给TM(topology master),当启动反向限流后,TM把这个执行时间传递给spout。这样spout每次发送一个tuple,就会等待这个执行时间。而当spout降速之后,发送过阻塞命令的task检查队列水位是否连续n次低于某个阈值,如果是,就会发送解除限流命令给TM,TM然后发送提速命令给所有的spout,这样spout每次发送一个tuple就会减少等待时间,当spout的等待时间降为0,spout就会不断地向TM发送解除限速给TM,当所有降速的spout都发了解除限速命令,那么就会将topology的状态设置为正常,标志真正解除限速。

配置示例如下所示:

## 反压总开关
topology.backpressure.enable: true
## 高水位 -- 当队列使用量超过这个值时,认为阻塞
topology.backpressure.water.mark.high: 0.8
## 低水位 -- 当队列使用量低于这个量时, 认为可以解除阻塞
topology.backpressure.water.mark.low: 0.05
## 阻塞比例 -- 当阻塞task数/这个component并发 的比例高于这值时,触发反压
topology.backpressure.coordinator.trigger.ratio: 0.1

## 反压采样周期, 单位ms
topology.backpressure.check.interval: 1000
## 采样次数和采样比例, 即在连续4次采样中, 超过(不包含)(4 * 0.75)次阻塞才能认为真正阻塞, 超过(不包含)(4 * 0.75)次解除阻塞才能认为是真正解除阻塞
topology.backpressure.trigger.sample.rate: 0.75
topology.backpressure.trigger.sample.number: 4

3、Spark Streaming中如何处理反压问题

Spark Streaming程序中当计算过程中出现batch processing time 大于 batch interval的情况时,(其中batch processing time为实际计算一个批次花费时间,batch interval为Streaming应用设置的批处理间隔),意味着处理数据的速度小于接收数据的速度,如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟),可以通过设置参数spark.streaming.receiver.maxRate来限制Receiver的数据接收速率,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。

Spark Streaming反压过程主要是根据JobSchedule反馈作业的执行信息来估算当前的最大处理速度(rate),然后动态地调整Receiver数据接收率。

反压执行过程主要分为两部分:BatchCompleted事件触发 以及 BatchCompleted事件处理

BatchCompleted事件触发:

  • 每当一个Job执行完成时会向eventLoop发送一个JobCompleted事件
  • EventLoop事件处理器接收到JobCompleted事件之后将调用handleJobCompletion来处理Job完成事件
  • handleJobCompletion使用Job执行的信息(delay)创建StreamingListenerBatchCompleted事件并通过StreamingListenerBus向监听器发送

BatchCompleted事件处理:

  • SteamingListenerBus将时间转交给具体的SteamingListener,即RateControlle
  • RateController接收到BatchCompleted事件之后调用onBatchCompleted进行处理
  • onBatchCompleted从Job的完成信息中抽取任务的执行延迟和调度延迟,然后断地给RateEstimator计算出一个新的rate并发布
  • 通过ReceiverTracker将新生成的rate包装成UpdateReceiverRateLimit事件转交给ReceiverTrackerEndpoint
  • ReceiverTrackerEndpoint接收到消息后,查询注册时的ReceiverSupvisorImpl,再将rate包装成UpdateLimit发送到endpoint
  • endpoint接收到消息后,使用updateRate更新BlockGenerators,同时计算出一个固定的令牌间隔

以上两个过程便将反压机制中最重要的rate调整完成。

当Receiver开始接收数据的时候,需要获取令牌才能够将数据存放入currentBuffer,否则的话将被阻塞,进而阻塞Receiver从数据源拉取数据。其中令牌投放采用令牌桶机制(参考下图),固定大小的令牌桶根据rate源源不断地产生令牌,如果令牌不消耗,或消耗的速度小于产生的速度,令牌就会不断的增多,直到把桶撑满。后面再产生的令牌就会被丢弃。

4、Flink中如何处理反压问题

Flink 在运行时主要由 operators 和 streams 两大组件构成。每个 operator 会消费中间态的流,并在流上进行转换,然后生成新的流。在 Flink 中,这些逻辑流就好比是分布式阻塞队列,而队列容量是通过缓冲池(LocalBufferPool)来实现的。每个被生产和被消费的流都会被分配一个缓冲池。缓冲池管理着一组缓冲(Buffer),缓冲在被消费后可以被回收循环利用。

上图展示的是两个task之间的数据传输:

  1. 记录"A"进入了Flink并且被Task 1处理(省略中间的一些反序列化、Netty接收过程)
  2. 记录别序列化到buffer中(LocalBufferPool1中有空间存储的buffer)
  3. buffer被送到Task 2中从这个buffer中读出记录(LocalBufferLocal2中有空间接收的buffer)

数据传输有两个场景:

本地传输:如果Task 1和Task 2在同一个worker即诶但,buffer可以直接传递给下一个Task。Task 2消费了该buffer,那么就会被LocalBufferPool1回收。如果Task 2消费的速度比Task 1取buffer的速度,导师LocalBufferPool1无可用的buffer,Task1等待在可用的buffer上。最终导致Task1的降速。

网络传输:如果 Task 1 和 Task 2 运行在不同的 worker 节点上,那么 buffer 会在发送到网络(TCP Channel)后被回收。在接收端,会从 LocalBufferPool 中申请 buffer,然后拷贝网络中的数据到 buffer 中。如果没有可用的 buffer,会停止从 TCP 连接中读取数据。在输出端,通过 Netty 的水位值机制来保证不往网络中写入太多数据。如果网络中的数据(Netty输出缓冲中的字节数)超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收端停止消费网络中的数据(由于接收端缓冲池没有可用 buffer),网络中的缓冲数据就会堆积,那么发送端也会暂停发送。另外,这会使得发送端的缓冲池得不到回收,writer 阻塞在向 LocalBufferPool 请求 buffer,阻塞了 writer 往 ResultSubPartition 写数据。

通过固定大小的缓冲池,保证了Flink有一套健壮的反压机制,使得Task生产数据的速度不会快于消费的速度。

参考

http://blog.csdn.net/cm_chenmin/article/details/52936575

https://github.com/alibaba/jstorm/wiki/Backpressure

http://www.jianshu.com/p/906db0d86653

http://www.cnblogs.com/barrenlake/p/5349949.html

http://data-artisans.com/how-flink-handles-backpressure/

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、Storm如何处理反压问题
  • 2、JStorm如何处理反压问题
  • 3、Spark Streaming中如何处理反压问题
  • 4、Flink中如何处理反压问题
  • 参考
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档