Apache Kafka作为Apache基金会的顶级开源项目,是时下最热的消息中间件,其被广泛应用在互联网大厂的后台系统以及各种机器学习推理平台中(Nvidia的DeepStream就是用Kafka做消息中间件的)。作为本专题第四篇文章,本文将会介绍Apache Kafka的特性与原理,教读者如何安装Kafka,并给出一套基于go语言的完整(包括comsumer和producer)客户端调用样例。
当一个系统出现明显的生产者/消费者(producer/consumer)分界时,消息队列将成为一个非常合适的系统组件,其有着削峰填谷,解耦系统的作用。笔者将用以下例子来解释消息队列的重要性。
在基于深度学习的监控系统中,摄像头负责收集实时画面,而各种GPU工作站负责用各种CNN模型进行一帧帧的处理,并将处理结果储存在数据库中。因为深度学习算法的复杂性,处理每一帧视频是非常耗时的操作,因此常常需要多台机器来处理一个摄像头的数据,如下图所示,有四台装有GPU的机器在用同一种算法处理来自一台摄像头的数据。
此时系统已经出现了producer/consumer分界:连接摄像头的机器作为producer,从摄像头处获得帧,并将帧均匀发送给四台机器,而四台机器则是consumer,从producer处拿到请求并进行消费(算法处理)。但是到目前为止我们还没有引入消息队列的概念,于是我们来看看这给我们的系统带来了怎样的问题。
producer从摄像头处获取到帧之后给producer同步发送RPC请求,因为有多台机器,producer需要自己实现负载均衡,并且一旦机器数量发生变化,producer需要相应地改变自己的负载均衡算法,这就给系统的producer和consumer带来了很大的耦合性。同时,由于producer的生产速率和consumer的消费速率不一定能match,一旦consumer跟不上了,会有大量的数据和请求积压在producer与consumer处,严重时会导致内存溢出机器宕机。
此时就是引入消息队列的时候了,如下图所示,producer将帧发送至消息队列,因此不需要关心负载均衡,也不会有请求的堆积,consumer按需从消息队列获得消息进行处理,因此也不会有被海量请求压到宕机的情况出现。至此我们消除了在上文中提到的种种问题。
此时我们的消息队列采取的是所谓的负载均衡( load balancing
)模式,也就是说,一旦一个consumer从消息队列中拿走一帧,这一帧在队列中就不存在了。然而,如果每一帧需要通过多个算法,比方说我们的监控系统要同时支持人脸识别与物体检测算法,每台机器运行一个算法,那么这种负载均衡模式将不再适用。因为当负责人脸识别算法的consumer从对列出拿出帧时,这一帧将不复存在,于是负责物体检测算法的机器会错过这一帧。当多个不同的consumer需要消费相同数据的情况出现时,load balancing模式将不再适用,此时我们需要切换至 fan-out
模式。如下图所示,每个consumer在队列中维护自己在消息队列中的 offset
,每当消费完一帧之后,将自己的offset加一并用新offset从队列中拿到新的帧。
下图取自《Designing Data Intensive Applications》,给出了 load-balancing
模式与 fan-out
模式的详细区别,笔者这里就不重复造轮子了。
此时读者应该发现了,在 fan-out
模式中我们完全失去了负载均衡与并行处理的能力,也就是说每个算法只能有一个consumer在运行,因为如果有多个consumer负责同一算法,那么每一帧将会被处理多次。读者也许会想,我们完全是在拆东墙补西墙嘛,难道就没有一个两全其美的方案吗?其实是有的,我们此时将引进一个全新的策略,叫做 partitioned logs
。
如下图所示,我们将消息队列更名为 message broker
,因为此时我们维护的已经不是一条队列了,而是一个个的队列分片,每一个分片叫做一个 partition
。从producer的角度看,我们将原先的一条队列分成两条队列,当producer进行入队操作时,message broker内置的负载均衡器会自动将消息放入一个partition(可以选择随机负载均衡或者通过传入的key进行哈希负载均衡)。同时,对于 consumer
来说,因为有了两个partition,对于同一个算法我们可以启动两台机器也就是两个consumer,我们给每一个算法的consumer集群起名叫 consumer group
,不同的consumer group之间互不干扰,consumer group内部的每个consumer读取不同的partition,可以实现并行处理,鹅妹子嘤!
值得注意的是,每个consumer group的大小不能超过partition的数量,因为这会导致我们之前提到过的问题:此时会有两个来自同一个consumer group的consumer读同一个partition,于是同一个算法会处理同一帧多次!如下图所示, face recognition1
与 face recognition3
在同时消费 partition2
。这是任何message broker都不会允许的。
然而,consumer group的大小是可以小于partition的数量的。如下图所示, face recognition1
同时处理来自两个partition的数据,并维护两套offset。
我们终于开始讲Kafka了,其实讲到这里,笔者已经用生动的例子将Apache Kafka的特性基本介绍完毕。现在只剩下一些Kafka的名词笔者还没有一一对应到刚才我们谈到的技术细节。
Apache Kafka是一个基于磁盘的,持久化分布式的消息系统(message broker),每个 topic
下的 partition
可以分布在多台机器上,同时每个 partition
又有着多个副本( replica
)来保证主节点宕机情况下的高可用。每个 consumergroup
消费一个 topic
下所有的 partition
。每个 topic
可以同时被多个 consumergroup
消费。
在上一节的例子中,我们提到的帧队列其实就是一个topic。上下图所示,一个topic可以看成是一个partitioned and replicated message queue。首先每个topic被分片成了多个partition,然后每个parition拥有多个副本(replica),replica 1被称为master replica,producer和consumer都只能读写master replica,同时master replica异步地给slave relica同步数据。master replica宕机时,Kafka通过ZooKeeper这个强一致的分布式协调者来从剩下的slave replica中选举出一个replica成为新的master(选主算法请参考本专题第三篇文章:分布式共识与Raft协议)。因为Kafka本身没有实现共识协议,因此需要一个高可用,强一致的第三方协调者来进行选主。
一个Kafka集群可以有多个topic,在上一章节的例子中,每个摄像头的数据流可以看成是一个topic,每个topic的消费是彼此独立的,也就是说一个consumer其实也可以消费多个topic。
在上图中,producer向一个特定的topic中发送数据,一个topic可以有多个producer,每个producer可以选择自己的消息是随机负载均衡到partition中还是通过给定的key明确哈希到特定的partition中。因为kafka无法保证跨partition的有序性,这个功能可以使特定的message被分配到同一个partition上从而保证顺序消费。比如在电商公司中,商品数据库的改动都会记录在Kafka中供各个业务方使用(比如搜索团队需要建立动态变化的索引),而如果商品数据库有进行分库分表,每张表都会接上一个producer,将写数据导入Kafka中。商品改动的顺序非常重要,如果搜索引擎先拿到新改动,再拿到老改动,那么索引保存的信息将会是错误的!因此我们必须保证同一商品的新增与改动都保存在同一个partition上。在Kafka中,producer会给每个消息附上商品的id作为key, 负载均衡器拿到消息时根据key做哈希来决定消息进入哪个partition,因为商品的id不会变,因此同一商品的所有数据都会按顺序保存在一个partition中。
message = new Message()
message.data = message_data
message.key = item_id
我们已经在上一章节中详细谈过consumer group的作用了,这里就不做赘述。这里再强调一遍,consumer group是对应partition的一个概念,因此consumer group的个数不能超过partition的个数。
自此笔者已经详细地介绍了Apache Kafka,希望读者不仅了解了Kafka的各种特性,更重要的是明白了比起Redis message queue等其他消息队列,哪些场景Kafka有着不可替代的作用。
如果读者想自己实现一套Kafka producer和consumer group,其实并不难,在macOS上安装并启动kafka只需以下四步:
brew cask install homebrew/cask-versions/adoptopenjdk8
brew install kafka
brew services start zookeeper
brew services start kafka
当安装并启动Kafka完毕后,可以根据基于Go语言的Sarama库的教学来自己实现一下producer和consumer group (https://github.com/Shopify/sarama/tree/master/examples),笔者这里就不重复造轮子了。因为笔者靠go吃饭,平时也是用go写kafka客户端,因此这里给出go的客户端实现,读者可以根据自己擅长的语言选择不同的客户端library。
[1] Martin Kleppmann, Designing Data-Intensive Applications.
[2] Sarama, a Go library for Apache Kafka 0.8, and above.