前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaProducer

KafkaProducer

作者头像
平凡的学生族
发布2020-06-16 16:00:53
5340
发布2020-06-16 16:00:53
举报
文章被收录于专栏:后端技术后端技术

消息的流动

一个请求的发送分为下面几步:

其中,Sender把准备好的Batch取出,把要发往同一Node的Batch放在一起,发给NetworkClient

内存管理的实现

管理一组poolableSize的内存,以便减少gc,增快效率。

消息是如何累加到Batch的

在写模式的ByteBuffer上叠加输出流,输出完成后转为读模式。

ProducerBatch维护一个MemoryRecordsBuilder,向其中写入记录。MemoryRecordsBuilder对ByteBuffer写入多条记录,再赋给MemoryRecords供读取。 MemoryRecordsBuilder显然是建造者模式。接受一个ByteBuffer,在其上开启ByteBufferOutputStream,并叠加压缩流。用法: 构造函数->closeForRecordAppends->build, 先用hasRoomFor/isFull判断是否可写入

  • 构造函数中会开启流
  • build会调用close后,返回builtRecords
    • close将ByteBuffer转为读模式,赋给MemoryRecords,并赋给builtRecords

请求的发送和响应是如何实现的

请求在发送时,在组件链中一路向前传递,而调用方线程(如果是get调用)会阻塞等待调用完成。那么当NetworkClient收到响应后,需要释放Batch的内存、控制对应请求的调用方线程继续运行、调用拦截器的回调,如何做到呢?

回调InFligh机制[1]

回调

RequestCompletionHandler NetworkClient收到响应后会回调Response,处理后续工作。 Sender创建ClientRequest时,会传入回调函数RequestCompletionHandler,在其中定义了释放ByteBuffer的逻辑。

在Sender::sendProduceRequest中定义了回调函数,该回调再NetworkClient收到请求时会调用:

一路调用至此,调用batch.done让调用方线程继续、调用回调。 另外移除inFlightBatches,移除incomplete(与Inflight机制有关,见下文)、释放内存

ProduceFuture与InterceptorCallback ProducerBatch创建时,会创建ProduceRequestResult,后者维护了一个CountDownLatch,在ProduceRequestResult上调用的await和done都会转发到其上。用于控制客户端的流程

ProducerBatch添加记录时,会把InterceptorCallback,拦截器的回调也加入

在ProducerBatch完成时,Sender::completeBatch->ProducerBatch::done调用该方法,拦截器回调和ProduceRequestResult都会被调用

InFlight机制

InFlight机制是我临时发明的概念,代表发送后等待处理的请求/Batch,实现这样机制的可以叫做InFlight结构体。

NetworkClient中的InFlightRequests、Sender中的inFlightBatches变量和RecordAccumulator中的IncompleteBatches,都用到了这个机制。

请求在被传往下一个组件前,会先以某种标识存于这样的结构体,当响应收到时,又按标识把对应的请求取出,处理对应的逻辑。

InFlightRequests NetworkClient用到的InFlightRequests中维护了一个Map,代表等待处理的请求。

消息发送时,NetworkClient::doSend会调用该方法,把请求按目标节点存于队列

当收到响应后,NetworkClient::handleCompletedReceives会调用InflightRequests::completeNext,按发送节点将队首请求取出,生成ClientResponse,完成之后的逻辑。

注意到请求发送后会按节点存在队列,收到响应后直接取出对应节点的队首,这是因为服务端保障了一个机制: "请求一定按顺序被响应,先发送的请求一定先响应"。所以尽管发往同一个节点的操作可能应用于不同partition(多个partition的leader都在一个节点上是可能的),它们的响应一定是按顺序返回的。

inFlightBatches Sender维护了一个inFlightBatches,代表"等待完成的Batch"。所有发送出去的请求,在还没收到响应前都存于此。

// Sender.java
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;

当NetworkClient收到响应后,对应的Batch就完成处理了,Sender的该方法会被调用,将该Batch移除。

incomplete 在RecordAccumulator::append中,内存被申请,该Batch被添加到incomplete。 在响应收到后,RecordAccumulator::deallocate被调用,移除对应的Batch,释放内存


  1. Sender::sendProduceRequest中定义的RequestCompletionHandler
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息的流动
  • 内存管理的实现
  • 消息是如何累加到Batch的
  • 请求的发送和响应是如何实现的
    • 回调
      • InFlight机制
      相关产品与服务
      批量计算
      批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档