如何快速入门Kafka消息队列?

  • 回答 (9)
  • 关注 (0)
  • 查看 (1397)

最近经常听到这个名词,但是不知道如何入门,我看到腾讯云也有相关的产品Ckafka产品,所以来问问~

婷槟沃婷槟沃提问于
天使的炫翼回答于

那么,我来讲讲Kafka 常用命令

修改topic 配置参数 —— 改保持时间

./kafka-topics.sh --zookeeper 1.1.1.1:2181/kafka --alter --topic Barad_Comm --config retention.ms=43200000

修改topic 配置参数 —— max.message.bytes

./kafka-topics.sh --zookeeper 1.1.1.1:2181/kafka --alter --topic Barad_Comm --config max.message.bytes=5242880

创建topic

./kafka-topics.sh --zookeeper 1.1.1.1:2181/kafka --create --topic Barad_Comm --partitions 16 --replication-factor 2

查看topic

./kafka-topics.sh --zookeeper  1.1.1.1:2181/kafka --describe --topic Barad_Comm

kafka leader 调整

kafka 自带调整工具kafka-preferred-replica-election.sh

编辑需要调整的topic 及partition ,保存为json文件。内容如下:

{
 "partitions":
  [
        {"topic":"Barad_Comm","partition": 4 },
        {"topic":"Barad_Comm","partition": 5},
        {"topic":"Barad_Comm","partition": 6},
        {"topic":"Barad_Comm","partition": 7}
   
  ]
}

执行如下命令:

bin/kafka-preferred-replica-election.sh --zookeeper 1.1.1.1:2181/kafka --path-to-json-file topicPartitionList.json

【备注】注意修改替换zookeeper 参数,以上命令适合于0.8.1 ,其余版本未做验证

最后通过describe 命令查看调整结果。

事情来得太突然资深菜鸟回答于

1. Kafka介绍

Kafka最早是由LinkedIn使用Java和Scala语言开发的,并在2011年开源,2012年成为Apache软件基金会的顶级项目。2014年,Kafka的几个创建人,成立了一家新的公司,叫做Confluent,专门从事Kafka相关的工作。

Kafka项目的目标是提供一个 统一的、高吞吐、低延迟的,用来处理实时数据的系统平台。按照官方的定义,Kafka有下面三个主要作用:

  1. 发布&订阅:和其他消息系统一样,发布订阅流式数据。
  2. 处理:编写流处理应用程序,对实时事件进行响应。
  3. 存储:在一个分布式、容错的集群中安全地存储流式数据。

1.1 消息系统

上面的三个作用,第一条就讲到,kafka是一个消息系统。那么什么是消息系统?它解决了什么样的问题? 我们以时下流行的微服务为例,假设Web端有Web1、Web2、Web3三个面向终端(微信公众号、手机App、浏览器)的Web服务(Http协议),内部有App1、App2、App3三个应用服务(远程过程调用,例如WCF、gRPC等),如果没有消息系统,采用直连的方式,它们之间的通信方式可能是这样的:

图1. 以直连方式进行通信的系统结构

采用这种方式,主要有下面几个问题:

  1. 服务之间紧耦合,设想我们修改一下应用服务2的外部接口,那么所有调用了它的组件均需要修改,在上图这种极端情况下(所有组件都调用它,这种情况在实际中并不多见),所有的其他Web服务和应用服务都需要做相应修改。
  2. 服务之间的这种紧耦合,有时候还会造成接口无法修改的问题:如果有不受控的第三方调用了接口,那么修改接口将造成第三方应用不可用。设想微信公众号的某个接口改动一下,将会造成成千上网的应用故障。
  3. 为了解决上面问题,接口往往以版本的方式发行,访问形式如 web/v1/interface、web/v1.1/interface、app/v2.0/interface。接口在小版本号之间兼容,大版本号之间不兼容。
  4. 上面这种接口规划方式虽然一定程度解决了紧耦合的问题,但又带来了新问题:更新需要改动多个版本序列,版本过多的时候将难于维护。
  5. 增减客户端繁琐,假如现在新加入一个应用服务4,提供某个Web服务所必须的功能,那么就要把Web服务1、Web服务2、Web服务3全都改一遍;同样,如果应用服务2不再需要,那么同样要在Web服务端去掉调用它的代码。
  6. 性能受限,不易扩展,例如要做负载均衡,需要借助第三方的工具,例如Zookeeper或者Consul等。或者需要改写代码或者加入特定的配置。

而引入消息系统时,结构将变成下面这样:

图2. 引入消息系统后的系统结构

引入消息系统后,上面的问题将会得到有效解决:

  1. 所有的组件,Web服务和应用服务,都不再关心彼此的接口定义,而仅关心数据结构(Json结构)。
  2. 仅需要知道与Kafka的通信协议就可以了,而Kafka的结构已经高度标准化,相对稳定和成熟。
  3. 提升了性能,Kafka针对大数据传输设计,吞吐率足以应付绝大多数企业需求。
  4. 易于扩展,Kafka本身就可以通过集群的方式进行扩展。除此以为,其独特的模式为负载均衡等常见需求提供了支持。

1.2 消息系统的两种模式

生产者/消费者 模式:

Producer(生产者):在数据管道一端 生产消息 的应用程序。

Consumer(消费者):在数据管道一端 消费消息 的应用程序。

  1. 生产者将消息发送至队列,如果此时没有任何消费者连接队列、消费消息,那么消息将会保存在队列中,直到队列满或者有消费者上线。
  2. 生产者将消息发送至队列,如果此时有多个消费者连接队列,那么对于同一条消息而言,仅会发送至其中的某一个消费者。因此,当有多个消费者时,实际上就是一个天然的负载均衡。

发布者/订阅者 模式:

Publisher(发布者):在数据管道一端 生成事件 的应用程序。

Subscriber(订阅者):在数据管道一端 响应事件 的应用程序。

当使用 发布者/订阅者 模式时,发往队列的数据不叫消息,叫事件。对于数据的处理也不叫消费消息,叫事件订阅。

  1. 发布者发布事件,如果此时队列上没有连接任何订阅者,则此事件丢失,即没有任何应用程序对该事件作出响应。将来如果有订阅者上线,也不会重新收到该事件。
  2. 发布者发布事件,如果此时队列上连接了多个订阅者,则此事件会广播至所有的订阅者,每个订阅者都会收到完全相同的事件。所以不存在负载均衡

1.3 流处理应用程序

区分批处理程序和流处理程序。

批处理和流处理的最大区别就是数据是否有明显的边界。如果有边界,就叫做批处理,例如:客户端每小时采集一次数据,发送到服务端进行统计,然后将统计结果保存到统计数据库。

如果没有边界,就叫做流式数据(流处理)。典型的流处理,例如大型网站的日志和订单,因为日志、订单是源源不断的产生,就像一个数据流一样。如果每条日志和订单,在产生后的几百毫秒或者几秒内被处理,则为流式程序。如果每小时采集一次,再统一发送,则将本来的流式数据,转换为了“批数据”。

流式处理有时候是必须的:比如天猫双11的订单和销售额,马云需要实时显示在大屏幕上,如果数据中心说:我们是T+1的,双11的数据,要12号才能得到,我想马云baba是不会同意的。

处理流数据和处理批数据的方法不同,Kafka提供了专门的组件Kafka Streaming来处理流数据;对于其他的Hadoop生态系统项目,各自提供了不同的组件,例如,Spark也包括了Spark Streming来处理流数据。而流数据处理的鼻祖Storm则是专门开发用来处理流式数据的。

除了使用数据边界来区分流处理和批处理以外,还有一个方法就是处理时间。批处理的处理周期通常是小时或者天,流处理的处理周期是秒。对应的,批处理也叫做离线数据处理,而流处理叫做实时数据处理。还有一种以分钟为单位的,叫做近线数据处理,但是这种方式讨论的比较少,其是离线处理的套路,只是缩短了处理周期而已。

1.4 存储:在一个分布式、容错的集群中安全地存储流式数据

默认情况下,Kafka中的数据可以保存一周。同时,Kafka天然支持集群,可以方便地增减机器,同时可以指定数据的副本数,保证在集群内个别服务器宕机的情况下,整个集群依然可以稳定提供服务。

在我们的数据中心的项目应用中,主要是用作数据传输。为了看清楚它解决了一个什么问题,先简单介绍一下这个项目的场景:

这个项目的前端是各种应用,应用的数量未来可能有几十上百个,但目前只有10个。这些前端的应用要将数据发送到后端的数据中心(一个我们称为数据采集器的程序,简称采集器),很明显,采集器与应用是一对多的关系。这样就会出现这样一种情况:大部分的时间采集器器空闲,但是当多个应用同时发数据时,采集器又处理不过来。此时就需要一个缓冲机制,使得采集器不会太闲也不会太忙。这时就可以采用Kafka作为这个数据缓冲池。

在这个应用范例中,选择Kafka而没有选择传统的成熟消息队列组件,例如RabbitMQ,是因为Kafka天生是为了应对大批量数据的,所以性能更好一些。

除了起到数据缓冲的作用以外,Kafka在数据中心的应用中,还起到了“平滑升级”的作用,如下图:

图3. 平滑升级

需求是这样的:之前的前端应用、数据采集、数据清洗程序都是采用.Net开发的,并存入到MS SQL Server数据库中。为了应对日益膨胀的数据量,决定采用大数据技术,将数据存储在HDFS上,并使用Spark进行数据统计。

因为引入了Kafka,所以不管是老版的前端应用、数据采集、还是清洗程序,都不需要做任何的改动。就可以接入新版的采集/清洗程序,因为只要从Kafka中取数据就好了。

当新版本的程序测试通过后,只需要简单地停掉老版程序,就可以平滑地切换到新系统。

1.5 引入消息队列带来的挑战

所有事物都不可能只有优点没有缺点,引入Kafka带来的挑战主要有下面几个:

  1. Kafka依赖,虽然系统中的应用彼此之间不依赖了,但是都重度依赖Kafka,此时Kafka的稳定性就非常重要(类似MSSQL Server一样的基础设施)。
  2. 微服务的实践中,出现了另一种服务独立化的呼声,即各个服务不需要其他的组件就可以独立对外提供服务,也就是去中心化。两种方式的选用时机就显得非常重要。
  3. 消息队列天然都是异步的,虽然提升了性能,但是增大了代码复杂性。本来使用RPC同步调用返回结果的简单操作,采用异步后加大了代码编写的复杂度和调试的复杂度。

2. Broker、Topic和Partition

2.1 Broker

Broker(服务进程):Broker直译为代理。我觉得这个称谓不好理解,其实通俗讲就是运行kafka的服务器,再具体一点就是运行Kafka的服务进程。

  • 当你连接到集群中的任意一个Broker时,就可以访问整个集群了。
  • 集群内的Broker根据Id进行区分,Id为纯数字。

2.2 Topic、Partition和Offset

Topic(主题):可以理解为一个数据管道,在这个管道的一端生产消息/发布事件,另一端消费消息/响应事件。管道本身进行消息/事件的存储、路由、发送。主题由它的名称(Name)所标识。

主题中的数据,不论是不是被消费,都会保存指定的一段时间,默认是一周。

Topic可以被分割成多个Partitions(分区)。

  • 分区内的数据是有序的。当一个主题只有一个分区时,那么这个主题的消息也是有序的;但如果一个主题有多个分区,那么消息是无序的。
  • 分区越多,并行处理数就越多。通常的建议是主机数x2,例如如果集群中有3台服务器,则对每个主题可以创建6个分区。
  • 当消息被写入分区后,就不可变了,无法再进行修改。除非重建主题,修改数据后重新发送。
  • 当没有key时,数据会被发往主题的任意一个分区;当有key时,相同key的数据会被发往同一个分区。

发往Partition的每条消息将获得一个递增id,称为offset(偏移量)。整体上看,结构如下图所示:

图4. Kafka Topic、Partition、Offset

2.3 Broker、Topic、Partition的分布

对不同的Topic,可以设置不同的Partition数目,当集群中有多个节点时,将会随机分布在不同的节点上。如下图:Topic1拥有3个Partition,Topic2则只有2个Partition。

图5. Broker、Topic、Partition的分布

2.4 Topic 副本数(replication factor)

通常会将Topic的副本数设置为2或者3,此时当某个节点故障下线时,该Topic依然可用,集群内的其他节点将会提供服务。

图6. Topic 2副本

显然,并不是副本数越多越好。副本数越多,同步数据需要花的时间越久,磁盘的使用率会越低。

注意:不管是Kafka集群还是Hadoop集群,并不是说节点越多容错就越高,容错是一样的;只不过是恰巧相关节点同时发生故障的概率小一些。比如说,Hadoop集群中有100个节点,当你的副本数设置为2时,恰巧保存这两个副本的节点故障了,相关的数据一样无法访问。而100个节点相对于5个节点或者3个节点,恰好保存相同副本的节点同时故障的概率低一些。

2.5 Partition Leader和ISR

对于多个Partition的Topic来说,只有一个Leader Partition,一个或多个ISR(in-sync replica,同步副本)。leader进行读写,而ISR仅作为备份。

图7. Topic 2副本(实际图)

3. Producer 生产者

3.1 Producer向Topic中写入数据

Producer只需要指定Topic的名称(Name),然后连接到集群中的任意一个节点,Kafka会自动进行负载均衡,并将对写入操作进行路由,从而写入到正确的Partition当中(多个Partition将位于集群中的不同节点)。

图8. Producer 用于写入数据

需要注意的是:上图没有加入ISR Partition,这么做事为了制图更简单一些。

3.2 Producer Acks

Producer可以选择用下面三种方式来获得数据写入的通知:

  1. Acks=0,速度最快,Producer不去等待写入通知,有可能存在数据丢失。
  2. Acks=1,速度较快,Producer等待Leader通知,但不会等待ISR通知,有可能ISR存在数据丢失。
  3. Acks=all,速度最慢,Producer等待Leader和ISR的通知,不存在数据丢失。

3.3 Producer Keys

Producer在发送数据时,可以指定一个Key,这个Key通常基于发送的数据。

举个例子,如果要发送一笔电商的订单数据(OrderNo 单号、Retailer 卖家、Customer 买家)。

如果:

  • 数据的接收端(Consumer),不关心订单的发送顺序,那么key可以为空,也可以为OrderNo。
  • 数据的接收端,要求卖家的订单要按顺序发送,那么Key设为Retaier。
  • 数据的接收端,要求买家的订单要按顺序发送,那么Key设为Customer。

这里比较容易晕的是:当Key为Retailer时,并不是说每次发送Key都填一个字符串,“Retailer”,而是Retailer的具体值。以下面的表格为例:

OrderNo

Customer

OrderAmount

OrderDate

Retailer

001

Jimmy

5200

2017-10-01 00:00:00

Apple

002

Jack

3180

2017-11-01 00:00:00

Apple

003

Jimmy

2010

2017-12-01 00:00:00

XiaoMi

004

Alice

980

2018-10-01 00:00:00

XiaoMi

005

Eva

1080

2018-10-20 00:00:00

XiaoMi

006

Alice

680

2018-11-01 00:00:00

XiaoMi

007

Alice

920

2018-12-01 00:00:00

Apple

那么对于订单001~007,将Retailer作为Key,则Key的值分别为:Apple(001)、Apple(002)、XiaoMi(003)、XiaoMi(004)、XiaoMi(005)、XiaoMi(006)、Apple(007)。

这样,所有Apple的订单会按次序发往同一个Partition,而所有XiaoMi的订单会按次序发往同一个Partition。这两个Partion可能是同一个,也可能不同。如下图所示:

图9. Producer Key用于路由数据

4. Consumer 消费者

4.1 Consumer基本概念

Consume用于从Topic中读取数据。和Producer类似,只需要连接到集群中的任意一个节点,并指定Topic的名称,Kafka会自动处理从正确的Broker和Partition中提取数据发给Consumer。

对于每个Partition而言,数据是有序的,如下图所示:

图10. Consumer 用于读取数据

4.2 Consumer Group

Kafka使用群组(Group)的概念巧妙地实现了 生产者/消费者、发布者/订阅者 模式的二合一。

一个Topic可以有多个Group,一个Group内可以包含多个Consumer。对于群组内的Consumer来说,它们是生产者/消费者模式,一个消息只能被Group内的一个Consumer消费;对于不同的群组来说,它们是发布者/订阅者模式,同一个消息会被发送给所有的群组。下图很好地描述了这样的关系:

图11. Consumer Groups

注意:一个Partition只会分配给同一个Group中的一个Consumer。如果只有3个Partition,但是一个Group中有4个Consumer,那么就会有一个Consumer是多余的,无法收到任何数据。

4.3 Consumer Offsets

首先要注意的是:这里的Conumser Offsets和前面Topic中的Offsets是两个完全不同的概念。这里的Offsets是Consumer相关的,前面的Offsets是Topic相关的(具体来说是Partition)。有下面几点需要注意:

  • offset记录了每个Group下每个Consumer读取到的位置。
  • Kafka使用了一个特殊的Topic用来保存Consumer Offsets,这个Topic的名称是__consumer_offsets。
  • 当Consumer离线后重新上线,会从之前offsets记录的位置开始读取数据。

Offsets的提交时机

  1. At most once(至多一次):Consumer只要一收到消息,就提交offsets。这种效率最高,但潜在的可能是当消息处理失败,例如程序异常,那么这条消息就无法再次获得了。
  2. At least once(至少一次):当Consumer处理完消息再提交offsets。这种可能会重复读,因为如果处理时异常了,那么这个消息会再读一次。这个是默认值。
  3. Exactly Once:还在试验阶段。

通常的做法是选择at least once,然后在应用上做处理,保证可以重复操作,但不会影响最终结果(即所谓的幂等操作)。比如说导入数据,在导入前要判断下是否已经导入过了。或者不判断先导入,然后用一个外挂程序将导重复的数据清理掉。

扩展知识:CAP理论:一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。

4.4 Zookeeper

Zookeeper是一个分布式服务注册、发现、治理的组件,大数据生态系统中的很多组件都有用到Zookeeper,例如HDFS等。Kafka强依赖于Zookeeper,实际上,在Kafka的安装包里就直接包含了其兼容的Zookeeper版本。

在Kafka中,Zookeeper主要有下面几个作用:

  • 管理集群中的节点,维护节点列表。
  • 管理所有的主题,维护主题列表。
  • 执行partition的leader选举。
  • 当集群变化时通知Kafka,这些变化包括新建Topic、Broker上线/下线、删除Topic

5. 总结

这是一篇很长的文章,我们讨论了Kafka中的主要概念和机制,相信通过这篇文章,你已经对Kafka有了一个初步的认识。在接下来的章节中,我们将会进行实际操作,看Kafka是如何工作的。个人使用过程中感到Kafka非常的稳定和健壮,希望你会和我一样喜欢它。

感谢阅读,希望这篇文章能给你带来帮助!

骑牛看晨曦love&peace~回答于

我先讲我遇到的问题及解决方案吧,几天前,我不得不设计一个基于海量写入的扇出架构。

对于这个学派的新手来说,我会尝试用非常简单的方式去解释。基于海量写入的扇出架构尝试在写入时使用所有业务逻辑。初衷是为了给每个用户及用例准备好视图;当有人想要读取数据时,他们不必应用复杂的逻辑。于是读取就会变得轻松简单且通常可以保证恒定的读取时间。Twitter就基于海量写入的扇出架构。

不必深入了解这些要求的细节,我在此处列出了简单的摘要:

  • 高写入容量
  • 读取时间几乎恒定
  • 必须具有容错能力并可以在商品硬件上扩展
  • 同样需要自由文本搜索和社交图遍历
  • 实时分析

我们设计的架构涉及三个数据库。MongoDB用于存储传入数据、Redis用于存储专为每个用户设计的数据集、ElasticSearch用于存储需要自由文本或部分文本搜索的文本结果。

对于每个传入的数据集都有业务逻辑决定在Redis中填充哪些数据集(基于社交图连接)以及决定在ElasticSearch中提取和存储哪些东西进行自由文本搜索。

听起来很简单!

鉴于此,我决定使用快速可靠的Apache Kafka作为消息代理,然后使用Storm处理数据并实现基于海量写入的扇出架构。

细节决定成败。这就是我打算在这里分享的内容。在使用Kafka和Storm之前,您应该了解一些关于每个应用的知识。

卡夫卡是一个优雅的消息队列。您可以将其用作发布 - 订阅或广播。它是如何完成它的工作的?

下面是解释相同信息的官方文档:

“消息传统上有两种模式:队列发布 - 订阅。在一个队列中,消费者池可以从服务器中读取消息且每条消息都发送到其中一个服务器上;在发布 - 订阅模型中,消息被广播给所有消费者。Kafka提供了概括了这两个模型的单一消费者抽象——消费群体

消费者用消费者组名称标记自己,并且发布到主题的每条消息都被传递至在每个订阅消费者组内的一个消费者实例。消费者实例可以在单一进程中或单一机器上。

若所有消费者实例具有相同的消费者组,那么这就像传统的消费者队列负载均衡一样工作。

若所有消费者实例具有不同的消费者群体,那么它就像发布 - 订阅一样工作,并且将所有消息广播给所有消费者。“

快速总结Kafka的显着特点

  • 消息被分为多个分区
  • 仅在分区内保证消息顺序
  • 生产者可以决定将数据发送给哪个分区

了解了这么多信息,我们就可以根据分类来创建主题。对于每种新型数据,我们都将新建主题。例如,如果我们使用Twitter,我们可以创建一个名为“推文”的主题。我们会将所有推文创建数据推送到这个主题中。但是跟随用户是完全不同的用例。根据分类理论,我们将为此创造一个新的主题,称之为“跟随”。所有与用户行为相关的数据都将发送到这个新的“跟随”主题中。

现在让我们看看排序。排序仅在主题的分区内被保证且每个主题可以有多个分区。消息只能转到主题中的一个分区。

鉴于此,我们如何实现持续的排序呢?打个比方,让我们以Twitter为例。如果您有10条推文,而您希望按照相同的时间顺序查看它们。

所以现在给出了两个选项。一个选项是每个主题仅包含一个分区并拥有很多主题。例如,为每个用户提供一个主题。只有这样使用一个分区,您才可以始终保持消息的顺序。但这将产生数以亿计的主题(每个用户一个主题)。

另一种选择是为每个用户分配一个主题和一个分区。通过这种方式您也可以确定顺序,但这意味着一个主题和数亿个分区。

现在我们了解到,这两种方法都不是最佳答案。太多主题或分区导致了性能问题。若您阅读架构的话,很显而易见的是它们都会造成开销进而降低性能。我不会去讨论为什么会发生这种情况,而是告诉您我们是如何解决它的。

每个生产者都可决定使用主题中的哪个分区发送数据。这让我们得以选择固定数量的分区并将用户均匀分配到这些分区上。我们发现平均商品硬件和3节点集群及15000分区是最佳选择。这是经过诸多性能测试和优化的结果。所以我们将用户输入内容均匀分配到15000个分区之中。我们没有为每个用户分配一个分区,而是将固定的一组用户分配到了一个分区。这使我们能确保在没有数百万个分区的情况下进行用户排序。

演化史记回答于

Kafka是分布式流平台。

一个流平台有3个主要特征:

  • 发布和订阅消息流,这一点与传统的消息队列相似。
  • 以容灾持久化方式的消息流存储。
  • 在消息流发生时处理消息流。

Kafka通常使用在两大类应用中:

  • 在系统或应用之间,构建实时、可靠的消息流管道。
  • 构建实时流应用程序,用于转换或响应数据流

Kafka的几个基本概念:

  • Kafka可以作为一个集群运行在跨越多个数据中心的多个服务上。
  • Kafka集群按照分类存储的消息流叫做topic
  • 每一个消息由一个主键、一个值、和一个时间戳组成。

Kafka有4个核心的API:

  • Producer API允许应用向一个或多个topic发送信息流。
  • Consumer API允许应用订阅一个或多个topic并处理产生的信息流。
  • Streams API允许应用扮演一个流处理器,从一个或多个topic消费输入流,并向一个或多个topic生产输出流。 实际上是转换输入流到输出流。
  • Connector API构建和运行连接Kafka的可复用的生产者或消费者,到已存在的应用或数据系统。例如:连接一个关系型数据库捕获表中的每一次变化。

在Kafka中,客户端和服务器之间的通信是通过一种简单的,高性能的,语言不可知的TCP协议完成的。

image1

Topics 和 Logs

我们了解一下Kafka为消息流提供的核心抽象——topic

一个topic是一个消息发布时的分类。Kafka中的topic总是有0个、1个、或多个消费者订阅写入其中的数据。

对于每一个topic,Kafka集群保存着分区日志:

每一个partition是一个有序的不可改变的消息队列, 它可以持续的追加——结构化的提交日志。partitions中的每一个记录都会分配

一个有序的id,这个id叫做偏移量——每一个partition中的一个消息的唯一标识符。

Kafka集群通过配置保留时间持久化所有发布的消息,不管它是否被消费。例如:设置保留时间为2天,一个消息发布后的2天内,它可以被消费,超过2天,它将被丢弃以释放空间。

实际上,保存在每一个消费者基础上的唯一元数据是偏移量(offset)或者说是日志中消费者的位置。偏移量(offset)时候被消费者控制的:

正常情况下,一个消费者在读取数据时,线性增加它的偏移量,但实际上,消费者控制位置,它可以按照任何顺序处理和消费消息。例如:消费者可以重置一个老的偏移量,重新

处理过去的数据,也可以跳到最新的数据,从“现在时刻”起,消费数据。

这些特性意味着消费者是十分廉价的,他们可以来去自如,不会和集群中的其他消费者冲突。例如:你可以到任何topic的消息末尾,而不影响正在消费这个topic的其他消费者。

日志中的partitions服务着几个目的:首先,它们允许日志的大小超出适合单个服务器的大小。每一个单独的partition必须适合于自己的服务器。但是一个topic可以有许多个partition

,所以它可以处理任意数量规模的数据。其次它扮演着平行的单位。

分布式

日志的partitions分布在Kafka集群中的服务上,每一个服务处理partitions中的一份。每一个partition可以通过配置服务的数量进行复制,以达到容灾的目的。

每一个partition都有一个服务扮演着"leader"的角色,0个或多个服务扮演着"followers"的角色。"leader"处理partition所有的读写请求,"followers"通过"leader"进行数据备份。

如果"leader"失败了,"followers"中的一个会自动变成"leader"。

异地同步

Kafka的MirrorMaker为集群提供异地同步支持,使用MirrorMaker,消息可以跨越多个数据中心或云区域进行复制。你可以用主-被模式进行数复制和恢复,也可以用主-主模式

把数据置于离用户更新的地方。

生产者

生产者发布数据到他们选择的topic,生产者负责选择哪一个消息分配到topic中的哪一个partition。它可以通过轮询的方式简单的实现负载均衡,或者通过消息主键进行语义分区。

消费者

消费者用消费组名称标志着他们自己。发布到topic的每一个消息都会传送到每一个订阅的消费组中的一个消费实例上。消费实例可以按照进程分割,也可以按照机器分割。

如果所有的消费实例在一个消费组下,消息实际上是在消费实例上进行负载均衡。

如果所有的实例在不同的消费组下,每一个消息都会广播到每一个消费实例。

两个服务器Kafka集群托管四个分区(P0-P3)和两个消费者组。消费者组A有两个消费者实例,而组B有四个消费者实例。

通常情况下,我们发现topic都有一个小量的消费组,每一个“逻辑订阅者”都有一个。每一个消费组都由许多消费实例组成,为了扩展和容灾。

这仅仅在生产-订阅语义上,订阅者由一个消费集群代替了单一的进程。

Kafka消费的实现方式是通过消费实例分割日志中的partition,所以,在任何时间点,每一个实例都是partition合理份额中的专一消费者。

组内保持关系的进程被Kafka协议动态的处理。如果一个新的实例加入了组,它会从组内的其他成员分配一些partition。如果一个实例死掉了, partitions会分配到剩余的实例中。

Kafka仅提供partition内的消息排序,不是topic内不同partition之间的。按分区排序与按键分区数据的能力相结合,足以满足大多数应用程序的需求。可是,

如果你需要消息的整体排序,它可以用一个topic只有一个partition来完成,这意味着一个消费组中,只有一个消费实例处理数据。

多租户

你可以用多租户方案部署Kafka集群。多租户可以通过配置启用哪些topic可以生产或消费数据。还有配额操作的支持。管理员可以根据请求定义和执行配额以控制客户端使用的代理资源。

担保

作为高级别的Kafka,给出了一下的担保:

  • 被发送到topic partition的消息会按照他们发送的顺序追加。如果M1被相同的生产者作为M2发送,M1先发送,M1有一个较低的offset,并且在日志中先与M2出现。
  • 消费者按照日志中的顺序发现消息。
  • 对于具有复制因子N的主题,我们将容忍多达N-1个服务器故障,而不会丢失任何提交给日志的记录。

Kafka作为消息系统

Kafka的流概念与传统企业消息系统如何比较?

传统的消息有连个模型:队列和发布-订阅。在队列中,每一个消息会分配到消费者中的一个,在发布-订阅模式下,每一个消息会广播到所有的消费者。

这两者中的每一个都有优点和缺点。队列的优点是可以通过多个消费者实例分割数据的处理,这可以扩展你的处理进程。不幸的是,队列不能有多个订阅者,一旦一个进程

读取了数据,它就消失了。发布-订阅允许你广播数据到多个进程,消息去了每一个消费者,你没有方式去扩展它。

Kafka消费组的概念整合了这两个概念。作为队列,消费组可以通过进程集合(消费组中的成员)分割处理。作为发布-订阅,Kafka允许你发布消息到所有的消费组。

Kafka模型的优点是每一个topic都有这两个属性,它可以扩展处理和有多个订阅者,不需要选择其中的一种。

Kafka比传统的消息系统有更强的排序保障。

传统的队列在服务端保存消息的顺序,如果多个消费者从队列中消费数据,服务按照存储的顺序分发消息。可是,虽然服务按照顺序分发数据,数据时异步的传递给消费者,

所以他们到达不同的消费者时是不能保证顺序的。这实际上意味着消息的顺序在平行消费面前是丢失的。消息系统为了解决这样的问题,通常有一个“专用消费者”的概念,

它只允许一个消费者从队列消费数据,这意味着没有平行处理。

Kafka可以更好的解决这个问题。通过有一个在topic内的平行partition的概念,Kafka既可以提供消息顺序的保障,又可以通过消费处理池进行负载均衡。

这是通过将topic中的partition分配给消费组中的消费者来实现的,以便每一个分区被组中的一个确定的消费者消费。通过这样做,我们确保了一个消费者

是partition的唯一读取者,并按照顺序消费数据。由于有多个partition,仍然可以通过多个消费者均衡负载。记住,组中消费者的数量不能大于partition的数量。

Kafka作为存储系统

任何允许发布消息并解耦消费的消息队列实际上都扮演着一个消息的存储系统。卡夫卡的不同之处在于它是一个非常好的存储系统。

写入Kafka的数据写入磁盘并进行复制以实现容错。Kafka允许生产者等待确认,以便写入在完全复制之前不会被认为是完成的,并且即使写入的服务器失败也能保证持续。

Kafka磁盘结构使用的规模很大 - 无论您在服务器上有50 KB还是50 TB的持久性数据,Kafka都会执行相同的操作。作为认真考虑存储并允许客户端控制其读取位置的结果,您可以将Kafka视为一种专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。

Kafka作为流处理

仅读取,写入和存储数据流是不够的,目标是启用流的实时处理。

在Kafka中,流处理器是指从输入主题获取连续数据流,对该输入执行一些处理并生成连续数据流以输出主题的任何内容。

例如,零售应用程序可能会接受销售和装运的输入流,并输出一系列重新排序和对这些数据计算出的价格调整。

可以直接使用生产者API和消费者API进行简单的处理。然而,对于更复杂的转换,Kafka提供完全集成的Streams API。这允许构建应用程序进行非平凡的处理,从而计算聚合关闭流或将流连接在一起。

这个工具有助于解决这类应用程序面临的难题:处理无序数据,重新处理代码更改的输入,执行有状态的计算等。

流API基于Kafka提供的核心原语构建:它使用生产者API和消费者API输入,使用Kafka进行有状态存储,并在流处理器实例之间使用相同的组机制来实现容错。

你想要的cccJava/Python/IOS爱好者,骑行+跑步。。。哦,对了,还养了两只猫。回答于

来点干货~Kafka 的详细设计及其生态系统

Apache Kafka 的核心要素有中介者,订阅主题,日志,分区还有集群,还包括像 MirrorMaker 这样的有关工具。

Kafka 生态系统由 Kafka Core,Kafka Streams,Kafka Connect,Kafka REST Proxy 和 Schema Registry 组成。Kafka 生态系统的其他组件多数都来自 Confluent,它们并不属于 Apache。

Kafka Stream 是一套用于转换,聚集并处理来自数据流的记录并生成衍生的数据流的一套 API,Kafka Connect 是一套用于创建可复用的生产者和消费者(例如,来自 DynamoDB 的更改数据流

)的连接器的 API,Kafka REST Proxy 则用于通过 REST(HTTP)生产者和消费者,Schema Registry 则用于管理那些使用 Avro 来记录 Kafka 数据的模式,而 Kafka MirrorMaker 用于将集群的数据复制到另一个集群里去。

Jean-Paul Azar 在 Cloudurable 工作。后者提供了 Kafka 培训,Kafka 咨询,Kafka 支持 并为在 AWS 中设置 Kafka 集群提供了支持服务。

Kafka 生态系统:Connect Source,Connect Sink 和 Kafka Streams 的图表

Kafka Connect Sources 是 Kafka 记录的来源,而 Kafka Connect Sinks 则是这一记录的目的地。

Kafka 生态系统:Kafka REST Proxy 和 Confluent Schema Registry

Kafka Streams —— Kafka 的数据流处理

Kafka Stream API 基于核心的 Kafka 原语而构建,并有着自己的生命周期。Kafka Streams 能够实时地处理数据流,并为实现数据流处理器提供了支持。数据流处理器会从输入的主题中获取连续的数据流,并对输入执行一些处理,转换和聚合操作,并最终生成一个或多个输出流。例如,视频播放器应用程序会接收观看视频和视频暂停事件的输入流,并输出用户偏好数据流,然后基于最近的用户活动或许多用户的共同活动来产生新的视频推荐单,以查看最近有哪些新的热门视频。Kafka Stream API 还解决了无序记录,跨数据流聚合,连接来自多个流的数据,有状态计算等等难题。

Kafka 生态系统:Kafka Stream 和 Kafka Connect

Kafka 生态系统总结

什么是 Kafka Streams?

Kafka Streams 支持数据流的实时处理。它可以在聚合多个数据流,连接来自多个流的数据,进行有状态的计算等等。

什么是 Kafka Connect?

Kafka Connect 是一套连接器的 API,用于创建可复用的生产者和消费者(例如,来自 DynamoDB 的更改数据流)。Kafka Connect Sources 是 Kafka 记录数据的来源。Kafka Connect Sinks 是这一记录的目的地。

什么是 Schema Registry?

Schema Registry 用于管理那些使用 Avro 来处理 Kafka 记录的模式。

什么是 Kafka Mirror Maker?

Kafka MirrorMaker 用于将集群里的数据复制到另一个群集。

什么时候会用到 Kafka REST Proxy?

Kafka REST Proxy 用于通过 REST(HTTP)生产者和消费者。你可以使用它来轻松集成现有的代码。

如果还不知道 Kafka 是什么,那么请参阅 ”什么是 Kafka?”。

Kafka 架构:底层设计

不得不说,这篇文章实际上就是我们关于 Kafka 架构的系列文章的一个摘要,这个系列包括 Kafka 订阅主题架构Kafka 生产者架构Kafka 消费者架构还有 Kafka 生态系统架构这些文章。

本文也在很大程度上受了 Kafka 设计这篇文章的启发。不妨把本文看成一个精选版。

Kafka 的设计动机

最初 LinkedIn 的工程师造出 Kafka 的缘由是为了给实时分析提供支持,它也因此被设计成了一个对数据流进行实时处理的分析系统提供支持的一个工具。同时 Linkedln 也将 Kafka 开发成了一个统一的实时处理数据流输入数据的平台。Kafka 的目标是建立一个高吞吐量的数据流平台,为日志聚合,用户活动这样的大容量事件流提供支持。

LinkedIn 对 Kafka 有着分布式化,支持分片和负载均衡这些方面的扩展需求,而这种扩展需求促成了 Kafka 的分区和消费者模型。与此同时,Kafka 还通过使用分区分布式提交日志扩展了其读写功能。其中 Kafka 的分片(sharding)被称为分区(Kinesis,类似于 Kafka,将它称为分区 “碎片”)。

根据维基百科所说,“数据库碎片(shard)是数据库或搜索引擎中的数据的一个水平分区。每个单独的分区都会被看作一个碎片或数据库碎片。为了分摊负载,每个碎片又会被保存在一个单独的数据库服务器实例里面。

Kafka 旨在处理来自离线系统的,周期性的大批量数据加载,以及传统的低延迟消息传递场景。

这里提一下 MOM(面向消息的中间件),其例子有 IBM MQSeries,JMS,ActiveMQ 还有 RabbitMQ。像许多 MOM 一样,Kafka 会通过复制副本和主导权选举这些方式保持对节点故障的宽容性。但是 Kafka 的设计更像是一个分布式数据库事务日志,而不是传统的消息传递系统。与许多 MOM 不同,Kafka 的复制机制是内置在底层设计中的,并不是一个衍生出来的想法。

持久化存储:依靠文件系统

Kafka 依靠文件系统来缓存或保存记录。

采用顺序写入方式的硬盘驱动器的磁盘性能很快(非常快)。这里需要介绍一下 JBOD(磁盘簇,Just a Bunch Of Disks)。带有 6 个 7200rpm SATA RAID-5 阵列的 JBOD 配置约为 600MB / 秒。跟 Cassandra 表一样,Kafka log(日志)也是只写(write-only)结构。这意味着数据会被附加到 log 的末尾。在使用硬盘的时候,顺序读写操作会更快速,并且可以预测,还可以通过操作系统进行重点优化。在使用机械硬盘(HDD)时,顺序磁盘访问可能还会快过随机内存访问,甚至是固态硬盘(SSD)。

虽然 JVM 垃圾回收的开销可能会很高,不过 Kafka 在很大程度上是凭借操作系统来进行缓存的,而这是一个大容量、快速且稳定的缓存。现代的操作系统会用上所有可用的内存来进行磁盘缓存,而且基于操作系统的文件缓存几乎没有在操作系统层面上的开销。尽管缓存一致性很难实现,但 Kafka 还是靠着稳定的操作系统实现了高速缓存一致性。凭借操作系统进行缓存还可以减少缓冲区副本的数量。由于 Kafka 倾向于进行连续的磁盘读取,使用操作系统的预读缓存可以让读取操作快得令人难忘。

Cassandra,Netty 和 Varnish 都使用了类似的技术。这些东西在 Kafka 文档中都有很好的说明,并且在 Varnish 有一个更有趣的解释。

大容量快速机械硬盘及长时间连续存取

Kafka 倾向于用长时间的顺序磁盘访问来执行读写操作。就跟 Cassandra,LevelDB,RocksDB 还有其他项目一样,Kafka 会使用一种对日志进行结构化存储和压缩的方式,而不是磁盘上随时可变的 BTree。另外,Kafka 会给应删除的记录标记一个墓碑,而不是立即删除记录,这也跟 Cassandra 一样。

现在的硬盘已经在某种程度上具有了无穷无尽的存储空间,并且读写速度还挺快,因此 Kafka 就能提供一些在消息传递系统里不常有的功能,例如持久保留旧消息。这一灵活性使得 Kafka 能有很多有趣的应用。

Kafka 生产者的负载均衡

生产者会向任一 Kafka 中介者查询 ”哪一个中介者具有订阅主题数据的分区的主导权” 这一元数据,这样就能省掉转发消息的路由层了。这一主导权信息能让生产者直接向相应分区的主导者发送记录。

生产者的客户端会控制生产者将消息发布到哪个分区,并且可以根据某些应用程序逻辑指定所发送的分区。生产者可以通过索引,轮询调度(round - robin)或使用自己在应用程序里定义的分区逻辑来为记录分区。

Kafka 生产者的记录分批

Kafka 生产者也支持给记录分为一个个批次。批次的大小可以通过设置每个批次里面记录的总字节数上限来配置。在记录凑不够一批的时候,Kafka 的生产者也能自动地在一定时间后将记录发送出去。

分批发送非常有利于有效利用网络 IO 性能,并可以大幅提高吞吐量。

为了权衡取得高吞吐量和高延迟这两者的影响,我们也可以按需配置等待凑够一批的缓冲时间。不过,在系统有着大量使用需求的情况下,有的时候也可以达成平均吞吐量又高总体延迟又低的效果。

分批发送能让我们把更多字节的数据累积在一起再发送,能让 Kafka 中介者省掉不少次规模较大的 I/O 操作,还能提高压缩的效率。为了取得更高的吞吐量,Kafka 生产者也能设置基于时间和大小进行的缓冲。生产者会把多条记录合成一批发送。相比于逐一发送记录,这种方式还能减少发送网络请求的数量。

Kafka 生产者的分批

private static Producer<Long, String> createProducer() { 
    Properties props = new Properties(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer"); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(); 
    
    // 按记录内容的总字节数计算的批次大小,设为 0 就相当于不分批
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); 
    
    // 在当前记录凑不满一个批次来发送的时候,至多等待多少毫秒来凑满 
    props.put(ProducerConfig.LINGER_MS_CONFIG, 20); 
     
    // 生产者能用来缓存等待分批发送给 Kafka 中介者的记录的存储空间的字节数
    // 如果数据发送的速度超出了中介者的处理速度,那么生产者会阻塞分区和发送的过程
    // 这段空间会用于存放压缩还有正在发送的记录
    props.put(ProducerConfig.BUFFER_NENORY_CONFI6, 67_108_864); 
    
    // 控制生产者在缓冲区空间耗尽,抛出 BufferExhaustedException 之前
    // 阻塞分区和发送过程多少毫秒
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); 
    
    // 以下略
}

Kafka 压缩

大型数据流平台的瓶颈并不总是在于 CPU 或磁盘,而通常在于网络带宽。云端以及很多集成化和虚拟化的环境甚至还有更多的跟网络带宽有关的问题,毕竟在这些情景中,多个服务可以共享一张网卡。此外,在数据中心和别的数据中心还有整个 WAN (广域网)进行交互的时候,网络带宽的问题还会更加突出。

分批处理会有利于进行高效的压缩,并进一步达成更高的网络 IO 吞吐量。

Kafka 提供了端到端的分批压缩,而不只是一次压缩一个记录。这样 Kafka 便有效地压缩了整批记录。对同一个消息批次可以只压缩并发送到 Kafka 中介者或服务器一次,并以压缩的形式写入日志分区。你还能通过设置压缩的方式,让 Kafka 中介者在将压缩的记录发送给消费者之前不进行解压。

Kafka 支持使用 GZIP,Snappy 和 LZ4 压缩协议。

拉取 vs. 推送/流式传输

Kafka 消费者能向中介者拉取数据。其他系统的中介者也能将数据或流数据推送给消费者。消息的传递系统通常是一个基于消息拉取的系统(像 SQS,以及大多数 MOM 都在使用拉取方式)。在使用拉取式的系统时,如果消费者处理速度赶不上消息增加的速度,它也可以在能赶上来之后再拉取新消息。

由于 Kafka 采用了拉取方式,因此它积极地实行了数据的分批处理。Kafka 像许多基于拉取的系统一样会实施长时间的轮询(SQS,Kafka 都这样做)。这一方式意味着在消费者会向中介者发出请求,并在这之后一直保持连接并等待响应。

一个基于拉取方式的系统必须由消费者进行数据的拉取并对其进行处理,并且在拉取和真正获取数据之间总是有一段延迟。

在另一方面,基于推送方式系统会主动将数据发送给消费者(像 scribe,flume,reactive streams,RxJava 还有 Akka)。基于推送或数据列的系统在应对处理速度赶不上或断开了连接的消费者方面有些问题。当数据的消费速度低于生产速度时,推送系统中的消费者很可能会被数据压垮。有些基于推送的系统会使用基于背压策略的退避协议,让消费者能发出 “处理速度跟不上“ 的信号。具体可以参考 reactive stream。在有着等待消费者发送对消息的确认的需求时,如何避免压垮消费者,以及消费者如何从处理速度赶不上的状态恢复过来这一问题会变得十分棘手。

基于推送或流式传输的系统可以立即发送请求或者累积请求并按批发送(或者是这两者的结合,并引入背压策略),而基于推送的系统总是会立即推送数据。消费者也可以把已经发送给它的数据累积在一起进行处理,这也有利于减少消息处理的延迟。不过,如果消费者在处理数据之后断开了连接,那么中介者该怎么知道消费者是不是依然在线,以及什么时候能再给消费者发送数据呢?这一问题的解决在推送或流式传输系统里面并不简单,而 Kafka 则通过使用拉取式系统解决了这些复杂问题。

传统的面向消息的中间件对消费者消息的状态追踪

对大多数 MOM(面向消息的中间件)而言,中介者会负责保持对发给消费者的消息的追踪,特别是要找出那些被标记为已处理的消息。这也不是一件容易的事情。

大多数 MOM 系统的目标是让中介者在消息得到了消费之后能快速删除数据。注意,这里的大多数 MOM 是在磁盘体积很小、功能不强、价格还贵的时候设计编写的。

这种类型的消息追踪(或者叫确认功能)说起来容易但做起来很难,因为中介者必须维护大量状态才能跟踪每条消息的发送还有确认状态,然后才能从中决定何时删除或重发消息。

Kafka 对发给消费者的消息的状态追踪

注意,Kafka 的订阅主题的内容会被分为若干个有序的分区。每条消息在这个有序分区中都有一个相对于某个原点的偏移量。每当进行消费的时候,对每个订阅主题的分区,一个消费者分组里面只会有一个消费者会来进行消费。

这一分区的布局意味着中介者会追踪数据在分区中的偏移量,而不是跟别的 MOM 一样去追踪每一条消息的状态,而且 Kafka 只需要追踪每个消费者组所消费的最后一组消息的数据的偏移量,还有其存储分区本身的偏移量。这种追踪偏移量的方式所要维护的数据量相比传统方式要少很多。

消费者会定期地向 Kafka 中介者发送偏移量的定位数据(消费者分组,还有分区偏移),中介者则会将此偏移数据存储到一个存放偏移量的主题中。

偏移式的消息确认与其他一些 MOM 相比成本要低得多。此外,消费者也能在这种模式下更加灵活地进行消费,甚至可以回退到更早的偏移量的消息(也就是重放)。如果消费者出故障了,那么就能在修复并重启消费者之后把消息重放一遍,毕竟 Kafka 可以持久地保存主题的日志数据。这种回退功能正是 Kafka 的优良特色。

消息传递语义

有三种消息传递语义:至多一次、至少一次、只有一次。传递最多一次的消息可能会丢失,但永远不会收到重复消息。传递至少一次的消息是永远不会丢失的,但可能会收到重复消息。只传递一次的消息则即确保了消息不会丢失,又确保了不会收到重复消息。只有一次这种方式的传递效果最好,但其开销较大,并且需要生产者和消费者记录更多的状态。

Kafka 消费者和消息传递语义

回想一下,所有副本都具有每一项的偏移量都相同的日志分区,并且每个消费者组都会在维护它们在每个订阅主题分区里面的日志中所处的位置。

就实现层面上来说,“最多一次” 意味着消费者会在读取消息之后将它在分区中的偏移量发送给中介者,让后者把偏移量保存起来,然后再处理消息。这一模式的问题在于消费者在从把偏移量发给了中介者到开始处理消息这段时间内可能会出故障,然后在消费者重启之后会继续从这一偏移量开始接收新消息,但它出故障之前读取的那条消息却不会得到处理。

“至少一次” 意味着消费者在读取并处理消息之后才会向中介者发送偏移量。这一模式的问题在于消费者在从处理完消息到发送偏移量之间这段时间也可能会出故障。然后在消费者重启之后,消费者就会收到并处理它在出故障之前已经处理过的消息。实际上,“至少一次” 是最常用的消息传递模式,不过它需要开发者确保消息具有幂等性(idempotent),即接收相同消息两次或以上的效果跟接收一次消息没有区别。

“只有一次“ 意味着消费者需要把消息处理和提交偏移量这两步并为一个原子性的事务,即如果偏移量发送失败,那么消息处理步骤也要回退。这一事务的实现需要在读取消息和保存处理消息的输出结果这两步之间进行一个二阶段的提交。或者,消费者也可以把偏移量和处理消息的输出存放在同一个地方,这样就可以通过查看这一位置存放的是偏移量还是处理的输出来判断偏移量有没有发送成功了。

Kafka 提供了前两种模式的实现,至于第三种模式的实现则要我们对消费者做一些修改。

Kafka 生产者的耐久度及消息确认

Kafka 也为耐久度提供了可操作的可预测的语义。在发布消息时,消息会被 “提交” 到日志中,这意味着所有 ISR(In-Sync Replicas,处于同步状态的副本)都会接受消息。只要至少有一个这样的副本在,这种提交策略就能很好地工作,这也有利于确保系统的耐久性。

生产者在收到对消息的确认之前会一直重发消息,而不管它所发送的消息有没有经过中介者。这就违反了 “只有一次” 还有 “至多一次” 的消息传递语义。

生产者的连接可能会在发送完毕等待确认的过程中断开,然后生产者在重新连接之后会无法确定它所发送的消息有没有被消费者成功处理,然后它就会把消息再发一遍。这种重新发送的逻辑使得引入消息序列号还有幂等消息处理(收到重复消息也没问题)成为了一个重要事项。直到 2017 年 6 月,Kafka 才实现了生产者因故障重启之后不会重复发送消息的一套机制。

生产者耐久度

生产者可以标明它的耐久度级别,而这一耐久度级别是由生产者对它发出的消息需要多大程度的确认来区分的。设置高程度的确认水平可以确保所有副本都保存了它发出的消息。

耐久度级别有:

生产者发送消息后不用等待确认(0);

生产者发送消息后需要等待来自一个分区主导者的确认(1);

生产者发送消息后需要等待来自所有副本的确认(-1,默认设置)。

对生产者的改进(2017 年 6 月发布的更新之一)

Kafka 现在为生产者端的 “只有一次” 交付、性能改善以及对多个分区的原子写操作提供了支持。他们通过让生产者随消息发送一个序列的 ID 实现了这一点。中介者会持续检查生产者是否已经发送了这个序列。如果生产者把这个序列又发送了一遍,它就会得到对这一重复的消息的确认,但这一消息不会保存到日志里面。采用这一改进功能不需要更改目前所用的 API。

Kafka 生产者对日志的原子性写操作(2017 年 6 月发布的更新之一)

Kafka 的另一个改进是 Kafka 生产者能进行对多个分区的原子写操作。原子写操作意味着 Kafka 消费者只能看到生产者已经写进去并完成了提交的日志(可以设置一些具体属性)。Kafka 会有一个协调者来为订阅主题日志记上一个标记,以表示已被成功处理的事务。事务协调者和事务日志会共同维护原子写操作的状态。

原子写操作需要用上新的生产者 API 来产生事务。这有一个使用新的生产者 API 的例子:

用来产生事务的新版生产者 API

producer.initTransaction();
try {
  producer.beginTransaction();
  producer.send(debitAccountMessage);
  producer.send(creditOtherAccountMessage);
  producer.sentOffsetsToTxn(...);
  producer.commitTransaction();
} catch (ProducerFencedTransactionException pfte) {
  // ...
  producer.close();
} catch (KafkaException ke) {
  // ...
  producer.abortTransaction();
}

Kafka 副本机制(Replication,或称复制机制)

在 Kafka 中, 对每个订阅主题的分区,都会有一些 Kafka 节点(数量可以自行设置)持有其副本。Kafka 的副本模型是一个自带的功能,而不像多数消息中间件那样把它视作一个插件式的功能,因为 Kafka 从一开始就引入了分区还有多节点特性。每个订阅主题的分区都会对应一个主导者以及若干从属者(也可能没有从属者)。

主导者还有从属者都被称为副本。主导者还有从属者节点的数量被称为副本因子。分区的主导权会在 Kafka 中介者之间平均地分摊。消费者只能从主导者那里读取消息。生产者也只能把消息发给主导者。

从属者的订阅主题日志分区会与主导者的日志分区保持同步,它会像一个普通的 Kafka 消费者一样从它们的主导者那里按批拉取记录。

Kafka 中介者出现故障的情景

Kafka 会持续追踪 Kafka 中介者的在线状态。为了表明自己处于这一状态,Kafka 中介者必须使用 ZooKeeper 的心跳机制来维护一个 ZooKeeper 会话,并且必须让所有的从属者的日志记录与主导者保持同步,而不会落后过多。

就中介者的在线状态这一概念来说,ZooKeeper 会话还有记录的同步都是它的必要条件,毕竟这一状态本身就表明了记录应该保持同步。跟主导者保持了同步的副本被称为 ISR。每个主导者都会持续记录跟主导者保持了同步的一组副本,即 ISR 的集合。

如果某个本来是 ISR 的从属者出了问题,使得它存储的进度落后了,那么主导者就会从它的 ISR 集合里除掉这个从属者 。在这里,副本的记录的进度落后的定义是这一副本在 replica.lag.time.max.ms 毫秒的时间之内没能与主导者达成同步。

当所有 ISR 都将消息存进它们的日志之后,该消息就会被看作是 “已提交” 的,而消费者只能看到已提交的消息。Kafka 确保了只要有一个 ISR,已提交的消息就绝不会丢失。

日志分区副本

一个 Kafka 分区就是一个日志副本(replicated log)。日志副本是分布式数据系统的基础。日志副本对基于状态机来实现其他分布式系统很有用。 整个日志副本模型会需要对一系列有序值 “达成共识”。

在主导者在线的时候,所有的从属者只需要按序复制它们的主导者的值即可。如果主导者掉线了,Kafka 会从与主导者同步的从属者中选择一位新的主导者。在生产者已经收到了对其发送的一个消息的提交确认,然后主导者掉线了的情况下,新当选的主导者肯定得持有这个已提交的消息。

若能有更多的 ISR,在主导者出故障的时候能顶替的从属者就越多。

Kafka 和 Quorum 机制

Quorum 机制在这里指的是通过对产生一个新的主导者所需要确认的副本数,还有应该与新主导者比较日志版本的副本数的选择,使得这两种副本的集合会有所重叠,以此来确保可用性。大多数系统都使用了投票流程,并把主导权交给得票多数者。不过,Kafka 为了提高可用性,并没有套用简单的多数票选机制。

在 Kafka 里,新主导者的选择是以持有完整的日志为基础的。如果我们有一个副本因子 3,那么肯定至少有两个 ISR 在主导者声明生产者发送的消息完成了提交之前达成了同步。如果要选出一个新主导者,那么新主导者必须能确保持有所有已经提交的消息,并且只能至多有 2 个副本同时掉线。

在一群从属者里面,必须至少有一个副本持有所有已提交的消息。多数票选的 Quorum 机制有个问题在于只要集群里面有一部分节点同时掉线了,那么就会凑不够满足这一条件的可进行投票操作的集群(副本因子为 2n+1 的情况下只能至多允许 n 个副本同时掉线)。

Kafka ISR 间的 Quorum 机制

Kafka 里的每个主导者都会维护一个 ISR 集合。只有这一集合中的成员才有资格参选新主导者。只有在所有 ISR 都写入新的记录之后,生产者发往分区的记录才能完成提交。ISR 集合被保存在 ZooKeeper 里面,在 ISR 集合产生变化时它会进行相应的操作。

这种类似于 Quorum 的 ISR 机制能让生产者发送消息时不用接收全部节点的多数确认,只需要接收 ISR 的多数确认即可。这种机制还能让副本重新加入到 ISR 集合里面,并且也能在加入之后发送确认。不过副本在重新加入到 ISR 集合之前需要重新达成记录的完全同步。

节点全掉线了,怎么办?

Kafka 只确保了在至少有一个从属者和主导者达成了同步的时候能避免数据的丢失。

如果所有的分区主导者的从属者全都同时掉线了,那么 Kafka 也便无法保证数据不会丢失了。如果分区的所有副本都出了问题,那么在默认情况下,Kafka 会选择第一个重新回到在线状态的副本(不一定是 ISR)来担任新的主导者(前提是要设置 unclean.leader.election.enable = true,不过这是默认值)。相比于一致性,这一选择更重视可用性。

如果在你的使用场景里一致性比可用性更重要,那么可以设置 unclean.leader.election.enable = false。在这样设置之后,如果某个分区的所有副本都掉线了,Kafka 会等待第一个 ISR 成员重新上线(不一定是第一个重新上线的副本)来担任新的主导者。

对生产者耐久度的选择

生产者可以通过将接收确认的需求 ack 设置为:none(0);the leader only(1);all replicas(-1)。

acks = all replicas 是默认值。总而言之,当所有当前的同步复制品(ISR)都收到该消息时,便会发生这种情况。

你可以在一致性和可用性之间进行权衡。如果一致性比可用性更重要,那么可以设置 unclean.leader.election.enable = false 并指定最小的 ISR 集合大小。

最小 ISR 集合大小越大,一致性就能得到越好的保证,但可用性就会越低,因为在 ISR 集合达到最小阈值之前是不能往相应的分区里面写入任何东西的。

配额控制

Kafka 为生产者和消费者设定了它们所能使用的网络带宽的配额。这些配额阻止了生产者和消费者一直占用 Kafka 中介者资源,其中配额限制的对象为客户端 ID 或用户。配额的数据会保存在 ZooKeeper 里面,因此更改配额不需要重启 Kafka 中介者。

Kafka 底层设计和架构总结

如何防止来自消费者的拒绝服务攻击?

用配额来限制消费者的带宽。

默认生产者耐久度(acks)水平是什么?

全部。这意味着所有 ISR 必须将消息写入其日志分区。

如果 Kafka 节点全都掉线了,在默认情况下会发生什么?

Kafka 会选择第一个重新上线的副本(不一定在 ISR集合中)作为新的主导者。因为在默认情况下 Kafka 会更重视可用性,会设置 unclean.leader.election.enable = true

为什么 Kafka 记录分批重要?

这能优化网络和磁盘的 IO 吞吐量。它还能通过压缩整个分批来提高压缩效率。

Kafka 的设计目标是什么?

成为一个高吞吐量,可扩展的数据流数据平台,用于对日志聚合,用户活动等大容量事件流进行实时分析。

截至 2017 年 6 月,Kafka 中的一些新功能有哪些?

生产者的原子性写操作,性能改进以及确保生产者不重复发送消息的机制。

消息传递语义是什么?

有三种消息传递语义:最多一次,至少一次,只有一次。

Jean-Paul Azar 工作于 Cloudurable。Cloudurable 提供的服务有 Kafka 培训,Kafka 咨询,Kafka 支持,以及为在 AWS 中设置 Kafka 集群提供帮助。

叮当叮当スターバーストするには回答于

有个问题,腾讯云也有Ckafka,优势在哪里?

艾迦号游戏程序员、《游戏引擎架构》译者回答于
或有或无习惯成就一切回答于
萌萌哒小昕玥一直积极治疗的神经病~回答于

回答都厉害了,学了不少知识,都是干货!!!

可能回答问题的人

  • 腾讯云技术服务团队

    腾讯云 · 技术服务团队 (已认证)

    16 粉丝0 提问3 回答
  • 宝哥@devops运维

    腾讯 · 高级云计算工程师 (已认证)

    45 粉丝0 提问0 回答
  • elliswu

    腾讯计算机系统有限公司 · 高级工程师 (已认证)

    3 粉丝0 提问0 回答
  • 腾讯云中间件团队

    30 粉丝0 提问0 回答
  • 小翔

    0 粉丝0 提问1 回答
  • 1076485026

    0 粉丝0 提问0 回答

扫码关注云+社区

领取腾讯云代金券