KafkaProducer

消息的流动

一个请求的发送分为下面几步:

其中,Sender把准备好的Batch取出,把要发往同一Node的Batch放在一起,发给NetworkClient

内存管理的实现

管理一组poolableSize的内存,以便减少gc,增快效率。

消息是如何累加到Batch的

在写模式的ByteBuffer上叠加输出流,输出完成后转为读模式。

ProducerBatch维护一个MemoryRecordsBuilder,向其中写入记录。MemoryRecordsBuilder对ByteBuffer写入多条记录,再赋给MemoryRecords供读取。 MemoryRecordsBuilder显然是建造者模式。接受一个ByteBuffer,在其上开启ByteBufferOutputStream,并叠加压缩流。用法: 构造函数->closeForRecordAppends->build, 先用hasRoomFor/isFull判断是否可写入

  • 构造函数中会开启流
  • build会调用close后,返回builtRecords
    • close将ByteBuffer转为读模式,赋给MemoryRecords,并赋给builtRecords

请求的发送和响应是如何实现的

请求在发送时,在组件链中一路向前传递,而调用方线程(如果是get调用)会阻塞等待调用完成。那么当NetworkClient收到响应后,需要释放Batch的内存、控制对应请求的调用方线程继续运行、调用拦截器的回调,如何做到呢?

回调InFligh机制[1]

回调

RequestCompletionHandler NetworkClient收到响应后会回调Response,处理后续工作。 Sender创建ClientRequest时,会传入回调函数RequestCompletionHandler,在其中定义了释放ByteBuffer的逻辑。

在Sender::sendProduceRequest中定义了回调函数,该回调再NetworkClient收到请求时会调用:

一路调用至此,调用batch.done让调用方线程继续、调用回调。 另外移除inFlightBatches,移除incomplete(与Inflight机制有关,见下文)、释放内存

ProduceFuture与InterceptorCallback ProducerBatch创建时,会创建ProduceRequestResult,后者维护了一个CountDownLatch,在ProduceRequestResult上调用的await和done都会转发到其上。用于控制客户端的流程

ProducerBatch添加记录时,会把InterceptorCallback,拦截器的回调也加入

在ProducerBatch完成时,Sender::completeBatch->ProducerBatch::done调用该方法,拦截器回调和ProduceRequestResult都会被调用

InFlight机制

InFlight机制是我临时发明的概念,代表发送后等待处理的请求/Batch,实现这样机制的可以叫做InFlight结构体。

NetworkClient中的InFlightRequests、Sender中的inFlightBatches变量和RecordAccumulator中的IncompleteBatches,都用到了这个机制。

请求在被传往下一个组件前,会先以某种标识存于这样的结构体,当响应收到时,又按标识把对应的请求取出,处理对应的逻辑。

InFlightRequests NetworkClient用到的InFlightRequests中维护了一个Map,代表等待处理的请求。

消息发送时,NetworkClient::doSend会调用该方法,把请求按目标节点存于队列

当收到响应后,NetworkClient::handleCompletedReceives会调用InflightRequests::completeNext,按发送节点将队首请求取出,生成ClientResponse,完成之后的逻辑。

注意到请求发送后会按节点存在队列,收到响应后直接取出对应节点的队首,这是因为服务端保障了一个机制: "请求一定按顺序被响应,先发送的请求一定先响应"。所以尽管发往同一个节点的操作可能应用于不同partition(多个partition的leader都在一个节点上是可能的),它们的响应一定是按顺序返回的。

inFlightBatches Sender维护了一个inFlightBatches,代表"等待完成的Batch"。所有发送出去的请求,在还没收到响应前都存于此。

// Sender.java
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;

当NetworkClient收到响应后,对应的Batch就完成处理了,Sender的该方法会被调用,将该Batch移除。

incomplete 在RecordAccumulator::append中,内存被申请,该Batch被添加到incomplete。 在响应收到后,RecordAccumulator::deallocate被调用,移除对应的Batch,释放内存


  1. Sender::sendProduceRequest中定义的RequestCompletionHandler

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • mybatis的由来

    SQL散落在程序四周,不利于维护。 可以将SQL以key-value的行式统一存储,以key索取SQL即可。可以把它们事先存在配置文件中,再加载到内存。这就涉...

    平凡的学生族
  • spring @EnableAutoConfiguration实现原理

    在refresh中调用了PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors。Po...

    平凡的学生族
  • KafkaConsumer 组件源码 ConsumerNetworkClient

    负责缓存要发送的请求、将加了回调的请求交给NetworkClient、触发NetworkClient的IO。

    平凡的学生族
  • bash 的 Test

    原文 bash 中的 test 确实是一个让初学者迷糊的概念,但是理解了之后,发现它并没有深奥的地方。 实际场景 export NVM_DIR="/Users/...

    IMWeb前端团队
  • bash 的 Test

    bash 中的 test 确实是一个让初学者迷糊的概念,但是理解了之后,发现它并没有深奥的地方。

    IMWeb前端团队
  • shell学习三参数传递 原

    echo "Shell 传递参数实例" echo "执行的文件名:$0" echo "第一个参数为:$1" echo "第二个参数为:$2" echo ...

    用户2603479
  • Linux常用命令13 - echo

    echo 命令是 Linux 中最基本和最常用的命令之一。 传递给 echo 的参数被打印到标准输出中。

    叉叉敌
  • 用CMD批处理脚本来守护进程

    Inkedus
  • windows nginx 管理脚本

    用户2657851
  • windows下代码分支批量处理脚本

    IT云清

扫码关注云+社区

领取腾讯云代金券