本来打算给这篇文章起名叫“搭建Kafka消息队列集群(基础概念篇)”,然而,和RabbitMQ不同,Kafka并没有实现消息队列的协议(例如AMQP,Advanced Message Queuing Protocol,提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计),所以尽管在使用方式上像极了队列,但并不算是严格意义上的消息队列。所以我还是折中一下,将标题取名为了“Kafka分布式消息系统”。
Kafka最早是由LinkedIn使用Java和Scala语言开发的,并在2011年开源,2012年成为Apache软件基金会的顶级项目。2014年,Kafka的几个创建人,成立了一家新的公司,叫做Confluent,专门从事Kafka相关的工作。
Kafka项目的目标是提供一个 统一的、高吞吐、低延迟的,用来处理实时数据的系统平台。按照官方的定义,Kafka有下面三个主要作用:
上面的三个作用,第一条就讲到,kafka是一个消息系统。那么什么是消息系统?它解决了什么样的问题? 我们以时下流行的微服务为例,假设Web端有Web1、Web2、Web3三个面向终端(微信公众号、手机App、浏览器)的Web服务(Http协议),内部有App1、App2、App3三个应用服务(远程过程调用,例如WCF、gRPC等),如果没有消息系统,采用直连的方式,它们之间的通信方式可能是这样的:
图1. 以直连方式进行通信的系统结构
采用这种方式,主要有下面几个问题:
而引入消息系统时,结构将变成下面这样:
图2. 引入消息系统后的系统结构
引入消息系统后,上面的问题将会得到有效解决:
生产者/消费者 模式:
Producer(生产者):在数据管道一端 生产消息 的应用程序。
Consumer(消费者):在数据管道一端 消费消息 的应用程序。
发布者/订阅者 模式:
Publisher(发布者):在数据管道一端 生成事件 的应用程序。
Subscriber(订阅者):在数据管道一端 响应事件 的应用程序。
当使用 发布者/订阅者 模式时,发往队列的数据不叫消息,叫事件。对于数据的处理也不叫消费消息,叫事件订阅。
区分批处理程序和流处理程序。
批处理和流处理的最大区别就是数据是否有明显的边界。如果有边界,就叫做批处理,例如:客户端每小时采集一次数据,发送到服务端进行统计,然后将统计结果保存到统计数据库。
如果没有边界,就叫做流式数据(流处理)。典型的流处理,例如大型网站的日志和订单,因为日志、订单是源源不断的产生,就像一个数据流一样。如果每条日志和订单,在产生后的几百毫秒或者几秒内被处理,则为流式程序。如果每小时采集一次,再统一发送,则将本来的流式数据,转换为了“批数据”。
流式处理有时候是必须的:比如天猫双11的订单和销售额,马云需要实时显示在大屏幕上,如果数据中心说:我们是T+1的,双11的数据,要12号才能得到,我想马云baba是不会同意的。
处理流数据和处理批数据的方法不同,Kafka提供了专门的组件Kafka Streaming来处理流数据;对于其他的Hadoop生态系统项目,各自提供了不同的组件,例如,Spark也包括了Spark Streming来处理流数据。而流数据处理的鼻祖Storm则是专门开发用来处理流式数据的。
除了使用数据边界来区分流处理和批处理以外,还有一个方法就是处理时间。批处理的处理周期通常是小时或者天,流处理的处理周期是秒。对应的,批处理也叫做离线数据处理,而流处理叫做实时数据处理。还有一种以分钟为单位的,叫做近线数据处理,但是这种方式讨论的比较少,其是离线处理的套路,只是缩短了处理周期而已。
默认情况下,Kafka中的数据可以保存一周。同时,Kafka天然支持集群,可以方便地增减机器,同时可以指定数据的副本数,保证在集群内个别服务器宕机的情况下,整个集群依然可以稳定提供服务。
在我们的数据中心的项目应用中,主要是用作数据传输。为了看清楚它解决了一个什么问题,先简单介绍一下这个项目的场景:
这个项目的前端是各种应用,应用的数量未来可能有几十上百个,但目前只有10个。这些前端的应用要将数据发送到后端的数据中心(一个我们称为数据采集器的程序,简称采集器),很明显,采集器与应用是一对多的关系。这样就会出现这样一种情况:大部分的时间采集器器空闲,但是当多个应用同时发数据时,采集器又处理不过来。此时就需要一个缓冲机制,使得采集器不会太闲也不会太忙。这时就可以采用Kafka作为这个数据缓冲池。
在这个应用范例中,选择Kafka而没有选择传统的成熟消息队列组件,例如RabbitMQ,是因为Kafka天生是为了应对大批量数据的,所以性能更好一些。
除了起到数据缓冲的作用以外,Kafka在数据中心的应用中,还起到了“平滑升级”的作用,如下图:
图3. 平滑升级
需求是这样的:之前的前端应用、数据采集、数据清洗程序都是采用.Net开发的,并存入到MS SQL Server数据库中。为了应对日益膨胀的数据量,决定采用大数据技术,将数据存储在HDFS上,并使用Spark进行数据统计。
因为引入了Kafka,所以不管是老版的前端应用、数据采集、还是清洗程序,都不需要做任何的改动。就可以接入新版的采集/清洗程序,因为只要从Kafka中取数据就好了。
当新版本的程序测试通过后,只需要简单地停掉老版程序,就可以平滑地切换到新系统。
所有事物都不可能只有优点没有缺点,引入Kafka带来的挑战主要有下面几个:
Broker(服务进程):Broker直译为代理。我觉得这个称谓不好理解,其实通俗讲就是运行kafka的服务器,再具体一点就是运行Kafka的服务进程。
Topic(主题):可以理解为一个数据管道,在这个管道的一端生产消息/发布事件,另一端消费消息/响应事件。管道本身进行消息/事件的存储、路由、发送。主题由它的名称(Name)所标识。
主题中的数据,不论是不是被消费,都会保存指定的一段时间,默认是一周。
Topic可以被分割成多个Partitions(分区)。
发往Partition的每条消息将获得一个递增id,称为offset(偏移量)。整体上看,结构如下图所示:
图4. Kafka Topic、Partition、Offset
对不同的Topic,可以设置不同的Partition数目,当集群中有多个节点时,将会随机分布在不同的节点上。如下图:Topic1拥有3个Partition,Topic2则只有2个Partition。
图5. Broker、Topic、Partition的分布
通常会将Topic的副本数设置为2或者3,此时当某个节点故障下线时,该Topic依然可用,集群内的其他节点将会提供服务。
图6. Topic 2副本
显然,并不是副本数越多越好。副本数越多,同步数据需要花的时间越久,磁盘的使用率会越低。
注意:不管是Kafka集群还是Hadoop集群,并不是说节点越多容错就越高,容错是一样的;只不过是恰巧相关节点同时发生故障的概率小一些。比如说,Hadoop集群中有100个节点,当你的副本数设置为2时,恰巧保存这两个副本的节点故障了,相关的数据一样无法访问。而100个节点相对于5个节点或者3个节点,恰好保存相同副本的节点同时故障的概率低一些。
对于多个Partition的Topic来说,只有一个Leader Partition,一个或多个ISR(in-sync replica,同步副本)。leader进行读写,而ISR仅作为备份。
图7. Topic 2副本(实际图)
Producer只需要指定Topic的名称(Name),然后连接到集群中的任意一个节点,Kafka会自动进行负载均衡,并将对写入操作进行路由,从而写入到正确的Partition当中(多个Partition将位于集群中的不同节点)。
图8. Producer 用于写入数据
需要注意的是:上图没有加入ISR Partition,这么做事为了制图更简单一些。
Producer可以选择用下面三种方式来获得数据写入的通知:
Producer在发送数据时,可以指定一个Key,这个Key通常基于发送的数据。
举个例子,如果要发送一笔电商的订单数据(OrderNo 单号、Retailer 卖家、Customer 买家)。
如果:
这里比较容易晕的是:当Key为Retailer时,并不是说每次发送Key都填一个字符串,“Retailer”,而是Retailer的具体值。以下面的表格为例:
OrderNo | Customer | OrderAmount | OrderDate | Retailer |
---|---|---|---|---|
001 | Jimmy | 5200 | 2017-10-01 00:00:00 | Apple |
002 | Jack | 3180 | 2017-11-01 00:00:00 | Apple |
003 | Jimmy | 2010 | 2017-12-01 00:00:00 | XiaoMi |
004 | Alice | 980 | 2018-10-01 00:00:00 | XiaoMi |
005 | Eva | 1080 | 2018-10-20 00:00:00 | XiaoMi |
006 | Alice | 680 | 2018-11-01 00:00:00 | XiaoMi |
007 | Alice | 920 | 2018-12-01 00:00:00 | Apple |
那么对于订单001~007,将Retailer作为Key,则Key的值分别为:Apple(001)、Apple(002)、XiaoMi(003)、XiaoMi(004)、XiaoMi(005)、XiaoMi(006)、Apple(007)。
这样,所有Apple的订单会按次序发往同一个Partition,而所有XiaoMi的订单会按次序发往同一个Partition。这两个Partion可能是同一个,也可能不同。如下图所示:
图9. Producer Key用于路由数据
Consume用于从Topic中读取数据。和Producer类似,只需要连接到集群中的任意一个节点,并指定Topic的名称,Kafka会自动处理从正确的Broker和Partition中提取数据发给Consumer。
对于每个Partition而言,数据是有序的,如下图所示:
图10. Consumer 用于读取数据
Kafka使用群组(Group)的概念巧妙地实现了 生产者/消费者、发布者/订阅者 模式的二合一。
一个Topic可以有多个Group,一个Group内可以包含多个Consumer。对于群组内的Consumer来说,它们是生产者/消费者模式,一个消息只能被Group内的一个Consumer消费;对于不同的群组来说,它们是发布者/订阅者模式,同一个消息会被发送给所有的群组。下图很好地描述了这样的关系:
图11. Consumer Groups
注意:一个Partition只会分配给同一个Group中的一个Consumer。如果只有3个Partition,但是一个Group中有4个Consumer,那么就会有一个Consumer是多余的,无法收到任何数据。
首先要注意的是:这里的Conumser Offsets和前面Topic中的Offsets是两个完全不同的概念。这里的Offsets是Consumer相关的,前面的Offsets是Topic相关的(具体来说是Partition)。有下面几点需要注意:
Offsets的提交时机
通常的做法是选择at least once,然后在应用上做处理,保证可以重复操作,但不会影响最终结果(即所谓的幂等操作)。比如说导入数据,在导入前要判断下是否已经导入过了。或者不判断先导入,然后用一个外挂程序将导重复的数据清理掉。
扩展知识:CAP理论:一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。
Zookeeper是一个分布式服务注册、发现、治理的组件,大数据生态系统中的很多组件都有用到Zookeeper,例如HDFS等。Kafka强依赖于Zookeeper,实际上,在Kafka的安装包里就直接包含了其兼容的Zookeeper版本。
在Kafka中,Zookeeper主要有下面几个作用:
这是一篇很长的文章,我们讨论了Kafka中的主要概念和机制,相信通过这篇文章,你已经对Kafka有了一个初步的认识。在接下来的章节中,我们将会进行实际操作,看Kafka是如何工作的。个人使用过程中感到Kafka非常的稳定和健壮,希望你会和我一样喜欢它。
感谢阅读,希望这篇文章能给你带来帮助!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。