首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka技术内幕之Producer Sender

通过上一篇的分析我们知道,主线程通过KafkaProducer.send()方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络操作是由Sender统一进行的。

Sender线程发送消息的整个流程:首先,它根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息;然后,根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点;之后,生成相应的请求,每个Node节点只生成一个请求;最后,调用NetworkClient将请求发送出去。

Sender实现了 Runnable接口,并运行在单独的ioThread中。Sender的run()方法调用了重载方法run(long),这才是Sender的核心方法,也是发送消息的关键流程。

我们先看看类图,然后再通过代码逐一进行分析。

ender

KafkaClient client:KafkaClient是接口,其实现类是NetworkClient,保存了每个节点的连接状态

RecordAccumulator accumulator:记录收集器,用来拉取消息

Map

> inflightBatches:键为TopicPartition,值为ProducerBatch集合的队列,用来保存已经发送但未收到响应的请求

整体处理流程:

1. 从Metadata获取Kafka集群数据

2. 调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送消息,返回ReadyCheckResult对象。

3. 如果ReadyCheckResult中标识有unknownLeaderTopics,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。

4. 针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,目的是检查网络方面是否符合发送消息的条件,不符合条件的Node将从readyNodes中移除。

5. 通过以上步骤处理后的readyNodes集合,调用RecordAccumulator.drain()方法,获取待发送的消息集合。

6. 调用RecorAccumulator的drain()方法,将队列记录收集器中的记录转变为

>集合。

6. getExpiredInflightBatches()方法处理已发送未收到响应的消息。代码逻辑是,遍历RecordAccumulator,调用RecordAccumulator.getDeliveryTimeoutMs()方法获取发送时间和当前时间,判断已经超时的消息。接着调用expiredBatches(),遍历ProducerBatch,查询出已超时的消息,如果已超时,将所有超时的消息添加到expiredBatches中,再调用failBatch()方法,调用ProdcuerBatch的done()方法释放空间。

7. 调用Sender.sendProduceRequest()方法将待发送消息封装成ClientRequest

8.调用NetwoekClient.send()将ClientRequestx写入KafkaChannel的send字段。

9.调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,并且还会处理服务端发回的响应、处理超时的请求、调用用户自定义的CallBack。

// RecordAccumulator

// 返回已经准备好的可以接受数据的分区的节点

以上就是Kafka Sender的介绍。下一篇我们将介绍Kafka Producer服务端处理消息。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181010G1DM7X00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券