前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka源码系列之0.10版本的Producer源码解析及性能点讲解

Kafka源码系列之0.10版本的Producer源码解析及性能点讲解

作者头像
Spark学习技巧
发布2018-01-30 18:22:43
8380
发布2018-01-30 18:22:43
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

一,基础讲解

本文是基于kafka 0.10讲的,kafkaProducer模型和0.8的客户端模型大致是一样的,区别是0.8版本的会为每个Broker(有给定topic分区leader的Broker)创建一个SyncProducer,而0.10的Producer是用一个NioSelector实现实现了多链接的维护的。也是一个后台线程进行发送。基本步骤,也是定期获取元数据,将消息按照key进行分区后归类,每一类发送到正确的Broker上去。

再写kafka文章的原因是0.10版本后跟spark结合有了大的变动,后面会讲解多版本的sparkStreaming和StructuredStreaming 与kafka的各种结合。所以在这里会更新两篇kafka文章:一篇关于kafka 0.10版本的Producer,另一篇当然是kafka 0.10版本的Consumer了。为后面的文章打下基础。

二,重要类讲解

Cluster

代表一个当前kafka集群的nodes,topics和partitions子集

Selector

org.apache.kafka.common.network.Selector。一个nioSelector的接口,负责非阻塞多链接网络I/O操作。该类于NetworkSend和NetworkReceive协同工作,传输大小限制的网络请求和应答。一个新的链接可以被加入到该nioSelector,当然需要配上一个id,通过调用

代码语言:js
复制
connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)

内部维护了一个java NIOSelector

java.nio.channels.Selector nioSelector;

NetworkClient

一个针对异步请求/应答的网络IO 的网络客户端。这是一个内部类,用来实现用户层面的生产消费者客户端。非线程安全的。

Sender

一个后台线程,主要负责发送生产请求到kafka集群。该线程会更新kafka集群的metadata,将produce Request发送到正确的节点。

RecordAccumulator

该类扮演的是一个队列的角色,将records追加到MemoryRecords实例中,用于发送到server端。

RecordBatch

一批准备发送的消息。该类是线程不安全的,需要加入外部同步加入需要修改的话。

MemoryRecords

用一个byteBuffer支撑的Records的实现。

RecordAccumulator维护了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

RecordBatch维护了一个MemoryRecords。

三,源码过程

1,构建必要对象的过程

用户代码里会构建一个KafkaProducer对象。

producer = new KafkaProducer<>(props);

在构造函数里活做三个重要的的事情

A),new Selector传递给NetworkClient

B),new NetworkClient

C),new Sender

D),new KafkaThread并将构建的send对象,当做该线程的runnable。并启动该线程。

E),构建了分区器和一个Metatdata。

F),构建了一个RecordAccumulator。此时需要关注的两个配置是

batch.size:批量发送的大小。

linger.ms:超时发送的时间。

合理配置两个值,有利于我们提升kafkaProducer的性能。

2,消息加入发送队列的过程

1),用户程序里调用KafkaProducer.send发送消息

代码语言:js
复制
producer.send(new ProducerRecord<>(topic,
 messageNo,
 messageStr), new DemoCallBack(startTime, messageNo, messageStr));

2),对消息按照partition策略进行分区。

代码语言:js
复制
//获取分区号
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());

3),将消息追加到RecordAccumulator。

代码语言:js
复制
//将消息追加到RecordAccumulator
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

具体追加的细节,先根据topic和partition信息获取一个recordBatch,然后在获取MemoryRecords,将消息加入其中

代码语言:js
复制
//根据topic和partition信息获取该partition的队列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {//RecordBatch类非安全,需要加外部同步
 if (closed)
 throw new IllegalStateException("Cannot send after the producer is closed.");

 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
    if (appendResult != null)
 return appendResult;
}

tryAppend内部

代码语言:js
复制
//        获取最后一个RecordBatch
 RecordBatch last = deque.peekLast();
        if (last != null) {
//            将消息追加到该RecordBatch里面
 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());

Last.TryAppend

代码语言:js
复制
//        首先会判断是否有充足的空间
 if (!this.records.hasRoomFor(key, value)) {
 return null;
 } else {
//            将消息加入memoryRecords
 long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
 FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
 timestamp, checksum,
 key == null ? -1 : key.length,
 value == null ? -1 : value.length);
            if (callback != null)
 thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
 }

3,消息发送的过程

1),获取Cluster

代码语言:js
复制
//获取当前cluster信息,
Cluster cluster = metadata.fetch();

2),获取那些有数据待发送的分区,依据是batch.size和linger.ms

代码语言:js
复制
//获取当前准备好发送的有数据的分区
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

3),更新那些leader未知的分区信息

代码语言:js
复制
if (result.unknownLeadersExist)
 this.metadata.requestUpdate();

4),移除不能发送Request的node

代码语言:js
复制
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
 //判断连接是否能发消息
 if (!this.client.ready(node, now)) {
        iter.remove();
 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
 }
}

5),转化为list格式,以node为基准,清空那些给定的node数据

清空所有给定node的数据,然后将它们放到给定适合大小的list,以单个node为基准

代码语言:js
复制
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
 result.readyNodes,
                                                                 this.maxRequestSize,
 now);

6),以node为基准,将batches转化为ProducerRequests

代码语言:js
复制
//以单个node为基准,将batches数据转化为ProducerRequests
List<ClientRequest> requests = createProduceRequests(batches, now);

7),发送数据

代码语言:js
复制
for (ClientRequest request : requests)
 client.send(request, now);

8),做真正的网络读写的动作,之前会更新元数据 this.client.poll(pollTimeout, now);

四,总结

写本文的原因是为StructuredStreaming的系列文章之kafkaSink做准备。

1,具体调优请参考kafka系列文章。

2,性能调优的参数重要的就两个

batch.size:批量发送的大小。

linger.ms:超时发送的时间。

3,具体跟0.8.2.2区别,请参考:Kafka源码系列之通过源码分析Producer性能瓶颈

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-07-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档