前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink源码解读系列 | Flink中接收端反压以及Credit机制

Flink源码解读系列 | Flink中接收端反压以及Credit机制

作者头像
大数据真好玩
发布2020-09-22 10:18:28
8060
发布2020-09-22 10:18:28
举报
文章被收录于专栏:暴走大数据暴走大数据

可以看到每个task都会有自己对应的IG(inputgate)对接上游发送过来的数据和RS(resultPatation)对接往下游发送数据, 整个反压机制通过inputgate,resultPatation公用一个一定大小的memorySegmentPool来实现(Flink中memorySegment作为内存使用的抽象,类比bytebuffer), 公用一个pool当接收上游数据时Decoder,往下游发送数据时Encoder,都会向pool中请求内存memorySegment 。因为是公共pool,也就是说运行时,当接受的数据占用的内存多了,往下游发送的数据就少了,这样是个什么样的情况呢?

比如说你sink端堵塞了,背压了写不进去,那这个task的resultPatation无法发送数据了,也就无法释放memorySegment了,相应的用于接收数据的memorySegment就会越来越少,直到接收数据端拿不到memorySegment了,也就无法接收上游数据了,既然这个task无法接收数据了,自然引起这个task的上一个task数据发送端无法发送,那上一个task又反压了,所以这个反压从发生反压的地方,依次的往上游扩散直到source,这个就是flink的天然反压。

从源码来看一下flink是如何实现的

来到数据接收的地方StreamInputProcessor.java中processInput()方法中

这里通过通过handler的getNextNonBlocked()方法获取到了bufferOrEvent后面就会将这个bufferOrEvent解析成record数据然后使用用户的代码处理了

其实这里的handler分为两种

  1. BarrierBuffer
  2. BarrierTracker

区别主要是barrierbuffer实现了barrier对齐的数据缓存,用于实现一次语义,这里以后随缘更新到容错机制的时候讲

来看一下getNextNonBlocked()方法

这个看到了通过会通过上游inputGate获取数据,具体看一下getNextBufferOrEvent()其中有两个比较重要的调用

先看requestPartitions()

先遍历了所有的inputchannel然后调用了requestSubpartition()在其中

先看一下1处,这里返回了一个Netty的Client来看一下createPartitionRequestClient是怎么创建的

可以看到源码的描述,这里其实就是创建与上游发送数据端的tcp连接的client端,用来接收上游数据的

接着

这里如果已经建立TCP连接就直接拿,与上游还没有建立tcp连接的话就会先初始化Client端,通过这个connect()方法

来看一下第一次是如何初始化连接的

看到这个应该熟悉Netty的同学一眼就了解了,在1处就是Client的具体逻辑了,然后与上游端口建立连接

来看一下具体的Client端具体的逻辑,这里最好对netty有一定的认识

1处是一个用于Encoder 的ChannelOutboundHandler常规的编码器没有什么好说的

2处是用于Decoder的ChannelinboundHandler常规的解码器没有什么好说的

3处 这里分为两种Handler,区别主要是在notifyCreditAvailable()方法

PartitionRequestClientHandler:不带信任机制的

这里取出了所有的带有信任的上游inputChannel并且向其响应发送了一个Credit对象

那带Credit机制的handler何时触发userEventTriggered()来触发向上游发送Credit呢?

先不慌,先来看下client接收到数据后做了什么,看下Nettyclient端的channelRead()方法(这里只看credit机制的)

decodeMsg()方法中

decodeBufferOrEvent()方法

在没有Credit机制的PartitionRequestClientHandler中

requestBuffer()方法就是请求memorySegmentPool中的memorySegment

这里不能确保能获取到,所以会用一个while(true)一直挂着

在Credit机制的CreditBasedPartitionRequestClientHandler中

请求requestBuffer()方法就是请求memorySegmentPool中的memorySegment因为信任机制在请求前就已经保证有足够的memorySegment所以不会请求不到,这里请求不到直接就抛异常了

然后OnBuffer( )方法

1处将将这个buffer加入到了这个receivedBuffers的ArrayDeque中,这里要注意receivedBuffers,这个queue后面会用到(后面处理数据就是循环的从这个queue中poll拉数据出来)

这里还要注意onBuffer方法还传入了backlog参数,这里是一个积压的数据量(既发送端还没有发送的且没有获取到Credit的数据量(buffer为单位)其实就是subpartation中的数据量,发送端会把这个积压量往接收端发,接收端会用这个积压量来判断是否可以发送Credit给上游 )

接着会根据积压的数据量

当可用的buffer数 <(挤压的数据量 + 已经分配给信任Credit的buffer量) 时,就会向Pool中继续请求buffer,这里请求不到也会一直while形成柱塞反压

然后通过notifyCreditAvailable()方法发送Credit,具体来看一下

可用看到这里就触发了前面说到的向上游发送Credit的方法了

到这里,Nettyclient端的初始化以及Netty的处理逻辑就讲完了

现在回到最最开始的地方

requestPartition()那里创建nettyclient后

currentChannel.getNextBuffer()方法中

前面我们说到的NettyClient端channelRead读取数据后会把数据放到一个recivedBuffers的queue中,这里就是去那个queue中取数据然后返回到我们的 数据接收的地方StreamInputProcessor.java中processInput()方法中的得到上游数据以后,就是开始执行我们用户的代码了调用processElement方法了。

然后while(true)开始了下一轮拉取数据然后处理的过程。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档