前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka Producer 为了极致性能,100 多行能写出多感人的代码,设计思路非常值得学习

Kafka Producer 为了极致性能,100 多行能写出多感人的代码,设计思路非常值得学习

作者头像
kk大数据
发布2021-04-22 14:57:25
6710
发布2021-04-22 14:57:25
举报
文章被收录于专栏:kk大数据kk大数据

本文大纲

一、开篇

之前我们已经大致阅读完了大数据存储领域的一个著名框架:HDFS。

今天我们继续阅读另一个大数据存储领域的框架:Kafka。

做大数据不可能不知道 Kafka,在日志采集、实时计算等领域,都有它的身影。而且 Kafka 的源码是众多开源项目中,代码质量比较高的一个,也比较有观赏性。

Kafka 的高可用和高性能,是各大公司偏爱它的一个非常重要的理由。能做到如此高性能,作者肯定是花费了一番心力的,不能放过任何一个细节。

本次,我们来聊一下 Kafka Producer 客户端发送数据的精彩代码。

二、主流程展示

首先,需要先对 Kafka Producer 端发送数据的流程有一个大致的印象,高屋建瓴。

下面就是 Producer 发送数据的主要流程图。这张图中并没有展示过多的细节,而是对主要流程做了说明。

1、 首先把要发送的消息封装为 ProducerRecord;

2、把消息的 key(如果设置了的话) 和 value 序列化;

3、使用分区器,计算需要把这个消息发送到哪个分区上;

4、把消息放入 RecordAccumulator (32M 的内存中),然后分批次发送;

5、Sender 线程取出消息发送到 Kafka 集群上。

三、Kafka 集群元数据信息

在右边的 Kafka 集群的示意图上,可以看到 每个 Broker 上有元数据信息,那么元数据到底有哪些信息呢?

如上图,MetaData 就是对于 Kafka 集群元数据信息的封装。

MetaData 里有对元数据的描述信息,包括更新时间、版本、更新频率等,还包括集群的真正的元数据 Cluster。

Cluster 对象中有主题、分区,节点,副本,正在同步的副本,以及对于这些信息结合在一起的各种数据结构。

四、Kafka 源码环境搭建

1、选择 Kafka 版本

本次我们使用 Kafka 0.10.1.0 版本的代码来阅读,为什么选择这么低的版本呢?现在(2021-04-18) Kafka 的最新版本已经是 2.7.0 版本了。

因为老一点版本的代码结构比较清晰。这样著名的开源项目,很多人都会去提交一些 patch,但是提交 patch 的开发人员代码质量参差不齐,会给源码阅读带来很多困扰。

2、环境搭建

(1)把源代码从 gitee 上 git clone 到本地,并切换到 0.10.0.1 分支

代码语言:javascript
复制
git clone git@gitee.com:apache/kafka.git
git checkout 0.10.0.1

(2)下载 gradle 3.0 ,解压缩到本地某个目录

代码语言:javascript
复制
https://gradle.org/releases/

(3)打开 IDEA,直接 open kakfa 源码的文件夹

修改 gradle 为下载好的 3.0

等待依赖下载完成就可以了。

五、Kafka 源码走读

1、从 example 下的 Demo 类开始(当前位置:KafkaConsumerProducerDemo)

2、打开 Producer 类,看里面的 run 方法,看到异步发送消息的方法(当前位置:Producer)

3、一直点到 doSend 方法里(当前位置:KafkaProducer)

4、可以看到主流程代码不是很多,主要是分为下面几步,也就是上面我们的图上描述的

(1)第一步,等待获取集群元数据

这里的源码估计需要单独开个文章来写,还是有点复杂的。

点进去可以看到主线程唤醒了 Sender 线程去拉取元数据

代码语言:javascript
复制
sender.wakeup();

然后自己在那等待元数据拉取完毕

代码语言:javascript
复制
metadata.awaitUpdate(version, remainingWaitMs);

(2)第二步,对消息进行序列化

(3)第三步,使用分区器对消息进行分区

分区之后,就知道每条消息要发往哪个分区了,但是在分区之前,要获得集群的元数据才能知道该发送到哪个分区中。

(4)第四步:确认消息是否超过了大小

这里的大小,一个是每条消息不能超过 Kafka 配置的每条消息的大小;

另外一个是不能超过 accumulator 的大小(32M)

(5)第五步:根据元数据信息,封装分区对象

(6)第六步:给每一条消息绑定回调函数

(7)第七步:把消息放入accumulator 中(32M的一个内存),然后把消息封装为一个个批次发送

(8)第八步:唤醒 Sender 线程,他才是真正发送数据的线程

代码语言:javascript
复制
this.sender.wakeup();

可以看到,主流程非常清晰,只有上面的八个步骤。

六、重点分析往缓存池写消息的过程

1、缓存的结构和设计思想

我们重点来分析一下这个缓存结构:

为什么要设计这样的一个缓存呢?

原因是如果每一条消息都触发一次发送请求,由于网络 I/O 会很慢,速度会上不去,吞吐量也就成为一个问题。

所以 Kafka 在这里设计了这样的一个缓存结构。它的设计思想是:有两个线程,一个是 main 线程,直接把消息发送到缓存中就直接返回;一个线程是 Sender 线程,在消息积累到一定大小时,通过 I/O 和 kafka server 交互发送消息。

这个设计思想,值得我们学习 !!!

2、缓存结构分析

看一下这个缓存的结构:

当消息序列化之后,经过分区器,会计算出来这个消息应该发送到哪个 topic 的哪个分区里面。

batches 这个 Map,会记录每个分区对应的 Deque,每个 Deque 里面放的是消息的批次。

这就是这个缓存的结构。

下面思考一个问题,在高并发每秒百万条消息的情况下,如何在保证最佳性能的情况下,还能维持数据在每个分区有序发送?

3、消息写入缓存源码分析

下面我们来看 RecordAccumulator 类的 append 方法(158 行)

首先要注意 batches 这个 结构:

代码语言:javascript
复制
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

是一个线程安全的Map,key 是每个 topic 的分区,值是一个队列,队列的类型是 RecordBatch。

(1)假设线程1往缓存结构中写入第一条消息

首先尝试获取这个分区对应的队列:

一开始分区对应的队列肯定是空的,所以创建了一个 ArrayDeque,放入到 batches 里面。

此时 topic 这个分区里面就有了对应的队列了。

获取到队列之后,尝试把数据写入到队列对应的批次中:

第一次进来,肯定队列中是没有批次的,所以返回了空:

返回了空之后,继续往下执行。

然后根据批次的大小(默认是 16K)和消息的大小,取一个最大值。防止特殊情况下,一个消息比批次还要大。

然后申请了一个 16K 的内存。

然后继续尝试往队列的批次中写消息

这个时候,批次仍然没有,返回空:

于是线程1,创建了一个批次,并且把这条消息写入到批次里面,并且放到了队列里面

(2)假设线程 1 往缓存结构中写第二条消息

同样的,获取队列:

此时队列已经建好了,所以不为空,然后尝试写入消息:

此时,队列中已经放入了一个批次了,所以批次也不为空,直接把消息写进去了,返回值也不为空:

直接返回了:

(3)现在假设两个线程,并发往这个结构写消息,会不会有问题呢?我们还是从头来一遍:

假设2个线程都是要往同一个分区里写消息。

下面的代码由于是 synchronize 加锁的,所以只有线程1能进来:

线程1,执行完后,释放了锁,线程2也进来了,发现也没有队列,也要继续执行。

可以看到,线程1 在 synchronize 外,申请了一段 16K 的内存,然后获得锁,尝试写消息,由于队列是空,返回空,继续执行。

然后创建了一个批次,写入消息,并且把批次写到了队列里。释放了锁。

下面看线程2,进来会怎样。

线程2 ,首先申请内存。获得锁,尝试往队列中写消息,发现队列中已经有一个批次了,于是直接把消息写到批次里面,返回值不为空,释放掉了刚刚申请的内存。

下面再看如果线程3,此时进来会怎样。

线程3,从方法最开始执行。

可以看到线程3,直接就返回了。

线程1,线程2,线程3,在 synchronize 加锁下,有序执行。

七、精彩代码 - 分段加锁

回头再看这个 append 方法,难免会有点奇怪,为啥要搞这么多 tryAppend 方法?

正常思维是,3个线程有序写消息,直接在 append 方法上加一个 synchronize 不就行了。

这就是它设计高明的地方。

可以看它的代码结构:

代码语言:javascript
复制
public xxx append(xxx) {
    // 获得队列
    Deque<RecordBatch> dq = getOrCreateDeque(tp);
    
    // 加锁
    synchronize (dq) {
        xxxx
        tryAppend();
    }
    
    // 申请内存,未加锁
    申请内存(比较耗时,所以没加锁);
        
    // 加锁
    synchronize (dq) {
        xxxx;
        tryAppend();
        xxxx;
    } 
}

可以看到上面的这个结构中,对于两段代码分别加了锁,加了锁的代码是完全的内存操作,速度很快。

但是对于申请内存这种耗时的操作,就没加锁。

使用这种分段加锁的结构,就能大大提高执行效率,比直接在方法上加一个 synchronize 要好很多。

然后为什么要搞这么多 tryAppend 呢?就是因为分段加锁,要控制多个线程的执行逻辑合理有序,才这么设计的。

八、精彩代码 - 写时复制,线程安全

我们再来看这段代码:

代码语言:javascript
复制
public xxx append(xxx) {
    // 获得队列
    Deque<RecordBatch> dq = getOrCreateDeque(tp);
    
    // 加锁
    synchronize (dq) {
        xxxx
        tryAppend();
    }
    
    // 申请内存,未加锁
    申请内存(比较耗时,所以没加锁);
        
    // 加锁
    synchronize (dq) {
        xxxx;
        tryAppend();
        xxxx;
    } 
}

发现这行代码是没有锁的,那多个线程执行不会有问题吗?

代码语言:javascript
复制
Deque<RecordBatch> dq = getOrCreateDeque(tp);

此时要看一下这个 batches 的是什么结构。

代码语言:javascript
复制
this.batches = new CopyOnWriteMap<>();

可以看到,这个 Map 结构实质是一个写时复制的 Map 。

看一下它的实现类的 put 方法:

代码语言:javascript
复制
    @Override
    public synchronized V put(K k, V v) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.put(k, v);
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }

可以看到在 put 的时候,把原来的 map 拷贝了一份,在拷贝的 map 上写,写完了之后,再把 map 复制给之前那个 map。

这样就没有并发问题了?确实没有了。

每一次写都是在一个独立的内存上写,当然没有了。

读的时候,是没有并发问题的。

这样不会有性能问题吗?

有的。但是!这个 batches 结构的 key 是 topic 的分区。相对于千千万万的消息来说,分区数是非常少的。就是说,put 的次数非常少,这点性能也就可以忽略不计了。

九、总结

本次,我们初步了解了 Kafka Producer 端的写数据流程,并且重点说明了为了提升 Kafka Producer 端的写性能,Kafka 特意设计了一个缓存结构,把消息封装成批次,再批量发送出去。

最后,我们还赏析了,为了极致的提升性能,Kafka 在往缓存结构写消息的时候,还使用了分段加锁、写时复制的代码技巧,极大的提升了性能。

Kafka 的代码值得我们认真学习,一定可以打开视野,提升代码水平!

觉得文章不错,记得关注我!加我微信,和我交流。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本文大纲
  • 一、开篇
  • 二、主流程展示
  • 三、Kafka 集群元数据信息
  • 四、Kafka 源码环境搭建
    • 1、选择 Kafka 版本
      • 2、环境搭建
      • 五、Kafka 源码走读
      • 六、重点分析往缓存池写消息的过程
        • 1、缓存的结构和设计思想
          • 2、缓存结构分析
            • 3、消息写入缓存源码分析
            • 七、精彩代码 - 分段加锁
            • 八、精彩代码 - 写时复制,线程安全
            • 九、总结
            相关产品与服务
            数据保险箱
            数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档