前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息队列之Kafka

消息队列之Kafka

原创
作者头像
羽毛球初学者
发布2024-10-14 17:45:37
960
发布2024-10-14 17:45:37
举报
文章被收录于专栏:JAVA基础知识

简介

kafka是⼀个分布式、⽀持分区的(partition)、多副本的(replica),基于zookeeper协调 的分布式消息系统,最⼤的特性就是可以实时的处理⼤量数据以满⾜各种需求场景。 它有以下特性:

  • ⾼吞吐量、低延迟:kafka每秒可以处理⼏⼗万条消息,延迟最低只有⼏毫秒;
  • 可扩展性:kafka集群⽀持热扩展;
  • 持久性、可靠性:消息被持久化到本地磁盘,并且⽀持数据备份防⽌数据丢失;
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
  • ⾼并发:⽀持数千个客户端同时读写;

主要应用应⽤场景有:

  • ⽇志收集:可以⽤Kafka收集各种服务的log,通过kafka以统⼀接⼝服务的⽅式开放给各种consumer,例如hadoop、Hbase、Solr等;
  • 消息系统:解耦和⽣产者和消费者、缓存消息等;
  • ⽤户活动跟踪:Kafka经常被⽤来记录web⽤户或者app⽤户的各种活动,如浏览⽹⻚、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘;
  • 运营指标:Kafka也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产各种操作的集中反馈,⽐如报警和报告。

实现原理

角色分工

kafka主要由以下几个角色配合完成工作:

  • Producer:消息的产⽣者,是消息的⼊⼝。
  • Broker:是kafka实例,每个服务器上有⼀个或多个kafka的实例,我们姑且认为每个broker对应⼀台服务器。每个kafka集群内的broker都有⼀个不重复的编号,如图中的broker-0、broker-1等。
  • Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
  • Partition:Topic的分区,每个topic可以有多个分区,分区的作⽤是做负载,提⾼kafka的吞吐量。同⼀个topic在不同的分区的数据是不重复的,partition在磁盘上是以⼀个⼀个的⽂件夹的形式存在的。每个partition的⽂件夹下⾯会有多组segment⽂件,每组segment⽂件⼜包含.index⽂件、.log⽂件、.timeindex⽂件(早期版本中没有)三个⽂件, log⽂件就是存储message的地⽅,⽽index和timeindex⽂件为索引⽂件,⽤于检索消息。
  • Replication:每⼀个分区都有多个Replication(副本),副本的作⽤是做备胎。当主分区(Leader)故障的时候会选择⼀个备胎(Follower)上位,成为Leader。在kafka中默认副本的最⼤数量是10个,且副本的数量不能⼤于Broker的数量,follower和leader绝对是在不同的机器,同⼀机器对同⼀个分区也只可能存放⼀个副本(包括⾃⼰)。
  • Consumer/Consumer group:消费者或消费者组,在kafka的设计中同⼀个分区的数据只能被消费者组中的某⼀个消费者消费。同⼀个消费者组的消费者可以消费同⼀个topic的不同分区的数据,这也是为了提⾼kafka的吞吐量。
kafka底层示例图
kafka底层示例图

分区 Leader 选举

当 producer 往 kafka 写⼊数据时,先写⼊ leader,而后 leader 再发布到 broker 中,最后由 followers ⾃⾏拉取数据。为了保证 kafka 的高可用,当某一个主题某一个分区的 leader 故障时,就需要选举其他 follower 成为新的 leader。

Leader 失效检测

在Kafka集群中,控制器节点(由ZooKeeper选举产⽣)负责检测和管理Leader失效的情况。控制器节点会监控各个分区Leader的状态,如果检测到某个分区的Leader失效,它会开始触发Leader选举过程。

Leader 选举

选举分区Leader主要分为三步:

  1. 更新ISR(In-Sync Replicas)列表。控制器节点⾸先确保ISR列表是最新的,即只包含那些与旧Leader副本保持同步的Follower副本。
  2. 选择新leader。控制器节点从ISR列表中选择⼀个新的Leader副本。优先级上,⼀般会选择第⼀个进⼊ISR列表的副本作为新Leader。如果ISR列表为空,Kafka可能会选择⼀个⾮同步副本(⾮ISR列表中的副本)作为Leader,但这会带来数据⼀致性⻛险,因为该副本可能没有最新的数据。
  3. 通知其他副本和客户端。选举完新Leader后,控制器节点首先将新Leader的身份和位置信息通知所有相关副本,以便它们更新⾃身状态。 然后Kafka通过ZooKeeper或控制器节点将新Leader信息更新到集群元数据中。客户端通过定期刷新元数据缓存获知新的Leader位置,从⽽继续发送读写请求。

工作流程

kafka的工作流程主要可分为六步:

  1. producer向kafka集群获取topic对应分区的leader,将消息发送给leader。在发送消息时,如果有指定partition,则写⼊对应的partition;如果没有指定partition,但是设置了数据的key,则会根据key的值hash出⼀个partition;如果既没指定partition,⼜没有设置key,则会轮询选出⼀个partition。
  2. leader收到消息后,将消息写⼊本地⽂件。
  3. producer采⽤push模式将数据发布到broke,followers从leader pull消息,写⼊本地⽂件后,向leader发送ack确认。
  4. leader收到所有followers的ack后,向producer发送ack。
  5. consumer group主动去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。(同⼀个消费组者的消费者可以消费同⼀topic下不同分区的数据,但是不会组内多个消费者消费同⼀分区的数据)
  6. 消费成功后,消费组将offset上报给kafka集群的consumer_offsets这个topic。

如何定位消息

当消费者去消费消息时,kafka集群从consumer_offsets这个topic获取所要消费topic的offset,再根据offset去找到对应的消息。

消息存储文件
消息存储文件

以上图为例:

  1. 先找到offset为 368801 的message所在的segment⽂件(利⽤⼆分法查找),这⾥找到的就是在第⼆个segment⽂件。
  2. 打开找到的segment中的.index⽂件(也就是368796.index⽂件,该⽂件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这⾥要查找的相对offset为5)。由于该⽂件采⽤的是稀疏索引的⽅式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这⾥同样利⽤⼆分法查找相对offset⼩于或者等于指定的相对offset的索引条⽬中最⼤的那个相对offset,所以找到的是相对offset为4的这个索引。
  3. 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据⽂件,从位置为256的那个地⽅开始顺序扫描直到找到offset为368801的那条Message。

这套机制是建⽴在offset为有序的基础上,利⽤segment+有序offset+稀疏索引+⼆分查找+顺序查找等多种⼿段来⾼效的查找数据。在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔⼀段时间上报⼀次,这⾥容易导致重复消费,且性能不好。在新的版本中消费者消费到的offset已经直接维护在kafka集群的__consumer_offsets这个topic中。

如何保证消息有序性

实际应用中,可以使用以下几种方式保证消息的有序性:

  • 将相关的消息发送到同一个分区,在一个分区内,Kafka 可以保证消息的顺序。
  • 消费者采用单线程的方式从单个分区读取和处理消息,这样可以确保在一个分区内消息是按顺序处理的。
  • 生产者在向同一个分区发送消息时,按照发送的顺序写入,只要这些消息被正确地分配到同一个分区,就能保证顺序性。
  • 合理设计消息的分区策略,确保具有顺序性要求的消息被路由到同一个分区。
  • 通过控制并发消费的方式,避免多个消费者同时处理同一个分区的消息,从而保证顺序。

如何保证消息不被重复消费

消息重复消费的根本原因在于消息队列的可靠性保证机制,即确保消息至少被消费一次(At Least Once)。这种机制确保了消息不会因为网络问题或消费者崩溃而丢失,但也可能导致消息被多次投递给消费者,比如出现网络故障、消费端异常无返回等原因。

为了避免消息重复消费,需要在消息的生产者、消息队列本身和消费者等多个层面采取措施。以下是一些常见的策略:

  • 消息去重标识:在消息中添加唯一标识(如消息ID、序列号等),消费者在处理消息时,通过记录已处理的标识,避免重复处理相同标识的消息。这种策略简单易行,但需要在消费者端维护一个状态存储(如数据库、Redis等),以记录已处理的消息标识。
  • 幂等性控制:幂等性是指无论操作多少次,对系统状态的影响都与执行一次相同。通过设计幂等性的消息处理逻辑,可以确保即使消息被重复消费,也不会对系统状态产生副作用。例如,对于数据库操作,可以使用唯一键约束或幂等性的SQL语句来避免重复插入或更新数据。
  • 分布式锁:在分布式系统中,可以使用分布式锁来确保同一条消息只会被一个消费者处理。分布式锁可以通过ZooKeeper、Redis等实现,但在使用时需要注意性能开销和死锁问题。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 实现原理
    • 角色分工
      • 分区 Leader 选举
        • Leader 失效检测
        • Leader 选举
    • 工作流程
      • 如何定位消息
      • 如何保证消息有序性
      • 如何保证消息不被重复消费
      相关产品与服务
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档