1) 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
1) 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
1) 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
1) 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
1) 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
深入阅读:终于知道Kafka为什么这么快了!
在这里插入图片描述
启动 Kafka时候自动开启的两个定时任务,“isr-expiration"和”isr-change-propagation"。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。
深入阅读:Apache Kafka中Follower如何与Leader同步数据 7、Zookeeper 在 Kafka 中的作用(早期) zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,
但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
1. Broker注册 Broker是分布式部署并且互相独立,此时需要有一个注册系统能够将整个集群中的Broker管理起来,此时就用到的Zookeeper。 在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokes/ids
2.Topic注册 在kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息以及与Broker的对应关系也都是邮件Zookeeper维护,由专门的节点记录:/brokers/topics
3.消费者注册 消费者服务器在初始化启动时加入消费者分组的步骤如下: 注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumer/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
4.生产者负载均衡 由于同一个Topic消息会被分区并将其分布在多个Broker上,因此生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
5.消费者负载均衡 与生产者相似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
6.分区与消费者的关系 消费组consumer group下有多个Consumer(消费者)。 对于每个消费者组(consumer group),Kafka都会为其分配一个全局唯一的Group ID,Group内部的所有消费者共享该ID。订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group) 同时,kafka为每个消费者分配一个Consumer ID,通常采用“Hostname:UUID”形式表示。 在kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此,需要在zookeeper上记录消息分区与Consumer之间的关系,每个消费者一旦确定了对一个消费分区的消费权利,需要将其Consumer ID写入到平Zookeeper对应消息分区的临时节点上,例如:/consumers/[group_id]/owners/topic/[broker_id-partition_id] 其中,[broker_id-partition_id]就是一个消息分区的表示,节点内容就是该消息分区上消费者的Consumer ID。
7.补充 早期版本的 kafka 用 zk 做 meta 信息存储,consumer 的消费状态,group 的管理以及 offse t的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖
Kafka存在丢消息的问题,主要发生在Broker、Producer、Consumer三种。 1. Broker broker写数据时首先写到PageCache中,pageCache的数据通过linux的flusher程序异步批量存储至磁盘中,此过程称为刷盘。而pageCache位于内存。这部分数据会在断电后丢失。 刷盘触发条件有三:
kafka没有提供同步刷盘的方式,也就是说理论上要完全让kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况,比如:
减少刷盘间隔log.flush.interval.ms(在刷新到磁盘之前,任何topic中的消息保留在内存中的最长时间) 减少刷盘数据量大小log.flush.interval.messages(在将消息刷新到磁盘之前,在日志分区上累积的消息数量)。
时间越短,数据量越小,性能越差,但是丢失的数据会变少,可靠性越好。这是一个选择题。
同时,Kafka通过producer和broker协同处理消息丢失的情况,一旦producer发现broker消息丢失,即可自动进行retry。retry次数可根据参数retries进行配置,超过指定次数会,此条消息才会被判断丢失。 producer和broker之间,通过ack机制来判断消息是否丢失。
2. Producer
producer在发送数据时可以将多个请求进行合并后异步发送,合并后的请求首先缓存在本地buffer中,正常情况下,producer客户端的异步调用可以通过callback回调函数来处理消息发送失败或者超时的情况,但是当出现以下情况,将会出现数据丢失
针对以上情况,可以有以下解决思路。
3.Consumer Consumer消费消息有以下几个步骤:
消费方式主要分为两种
Consumer自动提交机制是根据一定的时间间隔,将收到的消息进行commit,具体配置为:auto.commit.interval.ms。commit和消费的过程是异步的,也就是说可能存在消费过程未成功,commit消息就已经提交,此时就会出现消息丢失。 我们可将提交类型改为手动提交,在消费完成后再进行提交,这样可以保证消息“至少被消费一次”(at least once),但如果消费完成后在提交过程中出现故障,则会出现重复消费的情况,本章不讨论,下章讲解。
什么是 Rebalance
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。 例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。
触发 Rebalance 的时机
Rebalance 的触发条件有3个。
Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
Rebalance 过程
Rebalance 过程分为两步:JoinGroup 请求和 SyncGroup 请求。 JoinGroup :JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。 SyncGroup:SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。 深入阅读:Kafka Rebalance机制分析 线上Kafka突发rebalance异常,如何快速解决? 10、如何保证消息不被重复消费(保证消息的幂等性) 幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
出现原因:
解决思路:
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
深入阅读:为什么Kafka不支持读写分离?
Kafka选举主要分为以下三种:
分区副本选举机制 发生副本选举的情况:
分区leader副本的选举由Kafka控制器负责具体实施。主要过程如下:
分区副本分为ISR(同步副本)和OSR(非同步副本),当leader发生故障时,只有“同步副本”才可以被选举为leader。选举时按照集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。 同时kafka支持OSR(非同步副本)也参加选举,Kafka broker端提供了一个参数unclean.leader.election.enable,用于控制是否允许非同步副本参与leader选举;如果开启,则当 ISR为空时就会从这些副本中选举新的leader,这个过程称为 Unclean leader选举。 可以根据实际的业务场景选择是否开启Unclean leader选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。一般建议是关闭Unclean leader选举,因为通常数据的一致性要比可用性重要。
消费组(Consumer Group)选主 在Kafka的消费端,会有一个消费者协调器以及消费组,组协调器(Group Coordinator)需要为消费组内的消费者选举出一个消费组的leader。 如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果某一个时刻leader消费者由于某些原因退出了消费组,那么就会重新选举leader,选举源码如下:
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption
在组协调器中消费者的信息是以HashMap的形式存储的,其中key为消费者的member_id,而value是消费者相关的元数据信息。而leader的取值为HashMap中的第一个键值对的key(这种选举方式等同于随机)。
消费组的Leader和Coordinator没有关联。消费组的leader负责Rebalance过程中消费分配方案的制定。
controller挂掉后,Kafka集群会重新选举一个新的controller。这里面存在一个问题,很难确定之前的controller节点是挂掉还是只是短暂性的故障。如果之前挂掉的controller又正常了,他并不知道自己已经被取代了,那么此时集群中会出现两台controller。
其实这种情况是很容易发生。比如,某个controller由于GC而被认为已经挂掉,并选择了一个新的controller。在GC的情况下,在最初的controller眼中,并没有改变任何东西,该Broker甚至不知道它已经暂停了。因此,它将继续充当当前controller,这是分布式系统中的常见情况,称为脑裂。
假如,处于活跃状态的controller进入了长时间的GC暂停。它的ZooKeeper会话过期了,之前注册的/controller节点被删除。集群中其他Broker会收到zookeeper的这一通知。
在这里插入图片描述由于集群中必须存在一个controller Broker,所以现在每个Broker都试图尝试成为新的controller。假设Broker 2速度比较快,成为了最新的controller Broker。此时,每个Broker会收到Broker2成为新的controller的通知,由于Broker3正在进行"stop the world"的GC,可能不会收到Broker2成为最新的controller的通知。
在这里插入图片描述 等到Broker3的GC完成之后,仍会认为自己是集群的controller,在Broker3的眼中好像什么都没有发生一样。
在这里插入图片描述 现在,集群中出现了两个controller,它们可能一起发出具有冲突的命令,就会出现脑裂的现象。如果对这种情况不加以处理,可能会导致严重的不一致。所以需要一种方法来区分谁是集群当前最新的Controller。
Kafka是通过使用epoch number(纪元编号,也称为隔离令牌)来完成的。epoch number只是单调递增的数字,第一次选出Controller时,epoch number值为1,如果再次选出新的Controller,则epoch number将为2,依次单调递增。
每个新选出的controller通过Zookeeper 的条件递增操作获得一个全新的、数值更大的epoch number 。其他Broker 在知道当前epoch number 后,如果收到由controller发出的包含较旧(较小)epoch number的消息,就会忽略它们,即Broker根据最大的epoch number来区分当前最新的controller。
在这里插入图片描述 上图,Broker3向Broker1发出命令:让Broker1上的某个分区副本成为leader,该消息的epoch number值为1。于此同时,Broker2也向Broker1发送了相同的命令,不同的是,该消息的epoch number值为2,此时Broker1只听从Broker2的命令(由于其epoch number较大),会忽略Broker3的命令,从而避免脑裂的发生。
在kafka中,单个patition是kafka并行操作的最小单元。在producer和broker端,向每一个分区写入数据是可以完全并行化的,此时,可以通过加大硬件资源的利用率来提升系统的吞吐量,例如对数据进行压缩。在consumer段,kafka只允许单个partition的数据被一个consumer线程消费。因此,在consumer端,每一个Consumer Group内部的consumer并行度完全依赖于被消费的分区数量。综上所述,通常情况下,在一个Kafka集群中,partition的数量越多,意味着可以到达的吞吐量越大。
我们可以粗略地通过吞吐量来计算kafka集群的分区数量。假设对于单个partition,producer端的可达吞吐量为p,Consumer端的可达吞吐量为c,期望的目标吞吐量为t,那么集群所需要的partition数量至少为max(t/p,t/c)。在producer端,单个分区的吞吐量大小会受到批量大小、数据压缩方法、 确认类型(同步/异步)、复制因子等配置参数的影响。经过测试,在producer端,单个partition的吞吐量通常是在10MB/s左右。在consumer端,单个partition的吞吐量依赖于consumer端每个消息的应用逻辑处理速度。因此,我们需要对consumer端的吞吐量进行测量。
kafka支持分区数增加 例如我们可以使用 bin/kafka-topics.sh -alter --topic --topic topic-name --partitions 3 命令将原本分区数为1得topic-name设置为3。 当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响。当topic-config的分区数为1时,不管消息的key为何值,消息都会发往这一个分区中;当分区数增加到3时,那么就会根据消息的key来计算分区号,原本发往分区0的消息现在有可能会发往分区1或者分区2中。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。
Kafka 不支持减少分区数。 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。 实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响; 如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障? 与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
Kafka幂等性主要针对生产者而言。避免生产者数据重复提交至Kafka broker中并落盘。 在正常情况下,Producer向Broker发送消息,Broker将消息追加写到对应的流(即某一Topic的某一Partition)中并落盘,并向Producer返回ACK信号,表示确认收到。 但是Producer和Broker之间的通信总有可能出现异常,如果消息已经写入,但ACK在半途丢失了,Producer就会进行retry操作再次发送该消息,造成重复写入。
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
在这里插入图片描述
在这里插入图片描述
Producer使用幂等性的示例非常简单,与正常情况下Producer使用相比变化不大,只需要 把Producer的配置enable.idempotence设置为true即可,如下所示:
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//当enable.idempotence为true时acks默认为 all
// props.put("acks", "all");
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");
Prodcuer 幂等性对外保留的接口非常简单,其底层的实现对上层应用做了很好的封装,应用层并不需要去关心具体的实现细节,对用户非常友好
Kafka的幂等性实现了对于单个Producer会话、单个TopicPartition级别的不重不漏,也就是最细粒度的保证。如果Producer重启(PID发生变化),或者写入是跨Topic、跨Partition的,单纯的幂等性就会失效,需要更高级别的事务性来解决了。当然事务性的原理更加复杂
幂等性可以保证单个Producer会话、单个TopicPartition、单个会话session的不重不漏,如果Producer重启,或者是写入跨Topic、跨Partition的消息,幂等性无法保证。此时需要用到Kafka事务。 Kafka 的事务处理,主要是允许应用可以把消费和生产的 batch 处理(涉及多个 Partition)在一个原子单元内完成,操作要么全部完成、要么全部失败。为了实现这种机制,我们需要应用能提供一个唯一 id,即使故障恢复后也不会改变,这个 id 就是 TransactionnalId(也叫 txn.id),txn.id 可以跟内部的 PID 1:1 分配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 内部自动生成的(并且故障恢复后这个 PID 会变化),有了 txn.id 这个机制,就可以实现多 partition、跨会话的 EOS 语义。 当用户使用 Kafka 的事务性时,Kafka 可以做到的保证:
事务性示例 Kafka 事务性的使用方法也非常简单,用户只需要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,然后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort,示例如下所示: 生产者:
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {
String msg = "matt test";
producer.beginTransaction();
producer.send(new ProducerRecord(topic, "0", msg.toString()));
producer.send(new ProducerRecord(topic, "1", msg.toString()));
producer.send(new ProducerRecord(topic, "2", msg.toString()));
producer.commitTransaction();
} catch (ProducerFencedException e1) {
e1.printStackTrace();
producer.close();
} catch (KafkaException e2) {
e2.printStackTrace();
producer.abortTransaction();
}
producer.close();
消费者: 消费者应该设置提交事务的隔离级别
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
Kafka中只有两种事务隔离级别:read_committed、read_uncommitted 设置为read_committed时候是生产者事务已提交的数据才能读取到。在执行 commitTransaction() 或 abortTransaction() 方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在 KafkaConsumer 内部会缓存这些消息,直到生产者执行 commitTransaction() 方法之后它才能将这些消息推送给消费端应用。同时KafkaConsumer会根据分区对数据进行整合,推送时按照分区顺序进行推送。而不是按照数据发送顺序。 反之,如果生产者执行了 abortTransaction() 方法,那么 KafkaConsumer 会将这些缓存的消息丢弃而不推送给消费端应用。 设置为read_uncommitted时候可以读取到未提交的数据(报错终止前的数据)