首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka解惑之Old Producer(2)——Sync Analysis

上接:[Kafka解惑之Old Producer(1)—— Beginning]

上篇结尾一下子扩展的有点多,我们还是先回到DefaultEventHandler上来,当调用producer.send方法发送消息的时候,紧接着就是调用DefaultEventHandler的handle方法。下面是handle方法的主要内容,虽然行数有点多,但是这是Producer中最最核心的一块,需要反复研磨,方能一探究竟:

注意handle方法的参数是个Seq[KeyedMessage]类型的,而不是KeyedMessage。虽然Demo中用的只是单个KeyedMessage,最后调用底层的handle方法都是转换为Seq类型,你可以把Seq看成是java中的List,在Scala中表示序列,指的是一类具有一定长度的可迭代访问的对象,其中每个元素均带有一个从0开始计数的固定索引位置。

这个handle方法中首先是调用serialize(events)方法对消息进行序列化操作,这个容易理解,就是通过serializer.class参数指定的序列化类进行序列化。

其次获取所发送消息对应的元数据信息,然后将一坨消息(也有可能是一条)转换为HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]格式,其中key:Int表示broker的id,value是TopicAndPartition与消息集的Map,对应的方法为dispatchSerializedData()。因为客户端发消息是发到对应的broker上,所以要对每个消息找出对应的leader副本所在的broker的位置,然后将要发送的消息集分类,每个broker对应其各自所要接收的消息。而TopicAndPartition是针对broker上的存储层的,每个TopicAndPartition对应特定的当前的存储文件(Segment文件),将消息写入到存储文件中。

This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

接下去所要做的工作就是查看是否需要压缩,如果客户端设置了压缩,则根据compression.type参数配置的压缩方式对消息进行压缩处理。0.8.2.x版本支持gzip和snappy的压缩方式,1.0.0版本还支持lz4的压缩方式。compression.type参数的默认值值none,即不需要压缩。

最后根据brokerId分组发送消息。这个分组发送的过程就与ProducerPool有关了,我们前面提到在实例化Producer的时候引入了DefaultEventHandler和ProducerPool。这个ProducerPool保存的是生产者和broker的连接,每个连接对应一个SyncProducer对象。SyncProducer包装了NIO网络层的操作,每个SyncProducer都是一个与对应broker的socket连接,是真正发送消息至broker中的执行者。

当调用最上层的send方法发送消息的时候,下面的执行顺序为DefaultEventHandler.handle()->DefaultEventHandler.dispatchSerializedData()->DefaultEventHandler.send()。在底层的DefaultEventHandler.send方法定义为:

会Java的读者看这段代码的时候应该能看出来个90%以上,解释下这段代码:首先是找到更新的元数据中所有的brorker(更具体的来说是broker的id、主机地址host和端口号port三元组信息);之后在查到原有的ProducerPool中是否有相应的SyncProducer,如果有则关闭之后再重新建立;如果没有则新建。SyncProducer底层是阻塞式的NIO,所以关闭再建立会有一定程度上的开销,相关细节如下:

玩过NIO的读者对这段代码相比很是熟络,虽然是scala版的。如果没有接触过NIO,那么可以先看看这一篇:攻破JAVA NIO技术壁垒(百度“攻破JAVA NIO技术壁垒”即可,认准:朱小厮)。

说道这里我们用一副结构图来说明下Old Producer的大致脉络(注:图中的所有操作都是在一个线程中执行的):

篇幅限制,更多内容将在下一篇中进行介绍,关注本微信公众号,了解更多细节。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180129G0378100?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券