通过上一篇的分析我们知道,主线程通过KafkaProducer.send()方法将消息放入RecordAccumulator中缓存,并没有实际的网络I/O操作。网络操作是由Sender统一进行的。
Sender线程发送消息的整个流程:首先,它根据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息;然后,根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点;之后,生成相应的请求,每个Node节点只生成一个请求;最后,调用NetworkClient将请求发送出去。
Sender实现了 Runnable接口,并运行在单独的ioThread中。Sender的run()方法调用了重载方法run(long),这才是Sender的核心方法,也是发送消息的关键流程。
我们先看看类图,然后再通过代码逐一进行分析。
ender
KafkaClient client:KafkaClient是接口,其实现类是NetworkClient,保存了每个节点的连接状态
RecordAccumulator accumulator:记录收集器,用来拉取消息
Map
> inflightBatches:键为TopicPartition,值为ProducerBatch集合的队列,用来保存已经发送但未收到响应的请求
整体处理流程:
1. 从Metadata获取Kafka集群数据
2. 调用RecordAccumulator.ready()方法,根据RecordAccumulator的缓存情况,选出可以向哪些Node节点发送消息,返回ReadyCheckResult对象。
3. 如果ReadyCheckResult中标识有unknownLeaderTopics,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群信息。
4. 针对ReadyCheckResult中readyNodes集合,循环调用NetworkClient.ready()方法,目的是检查网络方面是否符合发送消息的条件,不符合条件的Node将从readyNodes中移除。
5. 通过以上步骤处理后的readyNodes集合,调用RecordAccumulator.drain()方法,获取待发送的消息集合。
6. 调用RecorAccumulator的drain()方法,将队列记录收集器中的记录转变为
>集合。
6. getExpiredInflightBatches()方法处理已发送未收到响应的消息。代码逻辑是,遍历RecordAccumulator,调用RecordAccumulator.getDeliveryTimeoutMs()方法获取发送时间和当前时间,判断已经超时的消息。接着调用expiredBatches(),遍历ProducerBatch,查询出已超时的消息,如果已超时,将所有超时的消息添加到expiredBatches中,再调用failBatch()方法,调用ProdcuerBatch的done()方法释放空间。
7. 调用Sender.sendProduceRequest()方法将待发送消息封装成ClientRequest
8.调用NetwoekClient.send()将ClientRequestx写入KafkaChannel的send字段。
9.调用NetworkClient.poll()方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,并且还会处理服务端发回的响应、处理超时的请求、调用用户自定义的CallBack。
// RecordAccumulator
// 返回已经准备好的可以接受数据的分区的节点
以上就是Kafka Sender的介绍。下一篇我们将介绍Kafka Producer服务端处理消息。
领取专属 10元无门槛券
私享最新 技术干货