接上篇《Kafka技术知识总结之四——Kafka 再均衡》
Kafka 实现高可用性的方式是进行 replication。对于 kafka,如果没有提供高可用性机制,一旦一个或多个 Broker 宕机,则宕机期间其上所有 Partition 都无法继续提供服务。若该 Broker永远不能再恢复,那么所有的数据也就将丢失,这是不可容忍的。所以 kafka 高可用性的设计也是进行 Replication。 Replica 的分布:为了尽量做好负载均衡和容错能力,需要将同一个 Partition 的 Replica 尽量分散到不同的机器。 Replica 的同步:当有很多 Replica 的时候,一般来说,对于这种情况有两个处理方法:
而 kafka 选取了一个折中的方式:ISR (in-sync replicas)。producer 每次发送消息,将消息发送给 leader,leader 将消息同步给他“信任”的“小弟们”就算成功,巧妙的均衡了确保数据不丢失以及吞吐率。具体步骤:
参考地址: 《Kafka消息投递语义-消息不丢失,不重复,不丢不重》 《消息队列面试题要点》
上述问题换一种问法,可以翻译为**如何保证消息队列的幂等性?**这个问题可以认为是消息队列领域的基本问题。这个问题的回答可以根据具体的业务场景来答,没有固定的答案。
无论是哪种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发出的确认消息形式不同(例如 RabbitMQ 是发送一个 ACK 确认消息,RocketMQ 是返回一个 CONSUME_SUCCESS 成功标志),kafka 是通过**提交 offset 的方式**让消息队列知道自己已经消费过了。
造成重复消费的原因,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
注:关于重复消费,还有部分与第七章 Kafka 再均衡相关的内容。 参考地址:《记一次线上kafka一直rebalance故障》
如何解决?这个问题针对业务场景来答,分以下三种情况:
kafka 有三种消息投递语义:
整体的消息投递语义由生产者、消费者两端同时保证。
Producer 端保证消息投递重复性,是通过 Producer 的 acks 参数与 Broker 端的 min.insync.replicas 参数决定的。
Producer 端的 acks 参数值信息如下:
前面 Producer 的 acks = 1 可以保证写入 Leader 副本,对大部分情况没有问题。但如果刚刚一条消息写入 Leader,还没有把这条消息同步给其他 Replica,Leader 就挂了,那么这条消息也就丢失了。所以如果保证消息的完全投递,还是需要令 acks = all;
首先上面说到,为了配合 Producer acks 参数为 all,需要令 Broker 端参数 min.insync.replicas = 2; min.insync.replicas 参数是用来配合 Producer acks 参数的。因为如果 acks 设置为 all,但某个 Topic 只有 leader 一个 Replica(或者某个 Kafka 集群中由于同步很慢,导致所有 follower 全部被剔除 ISR 集合),这样 acks = -1 就演变成了 acks = 1。 所以需要 Broker 端设置 min.insync.replicas 参数:当参数值为 2 时,如果副本数小于 2 个,会抛出异常。
注:然而在笔者的使用环境中,订阅是 Kafka 主要的使用场景之一,方式是对于想要订阅的某个 Topic,每个用户创建并独享一个不会重复的消费组。所以这样的情况下,环境下的 min.insync.replicas 只能等于 1;
除此之外,broker 端还有一个需要注意的参数 unclean.leader.election.enable。该参数为 true 的时候,表示在 leader 下线的时候,可以从非 ISR 集合中选举出新的 Leader。这样的话可能会造成数据的丢失。所以如果需要在 Broker 端的 unclean.leader.election.enable 设置为 false。
Consumer 端比较麻烦,原因是需要考虑到某个 Consumer 宕机后,同 Consumer Group 会发生负载均衡,同 Group 其他的 Consumer 会重新接管并继续消费。
假设两种场景:
第一个场景,Consumer 先提交 offset,再处理消息。代码如下:
List<String> messages = consumer.poll();
consumer.commitOffset();
processMsg(messages);
这种情形下,提交 offset 成功,但处理消息失败,同时当前 Consumer 宕机,这时候发生负载均衡,其他 Consumer 从已经提交的 offset 之后继续消费。这样的情况保证了 at most once 的消费语义,当然也可能会丢消息。
第二个场景,Consumer 先处理消息,再提交 offset。代码如下:
List<String> messages = consumer.poll();
processMsg(messages);
consumer.commitOffset();
这种情形下,消息处理成功,提交 offset 失败,同时当前 Consumer 宕机,这时候发生负载均衡,其他 Consumer 依旧从同样的 offset 拉取消息消费。这样的情况保证了 at least once 的消费语义,可能会重复消费消息。
上述机制的保证都不是直接一个配置可以解决的,而是 Consumer 端代码的处理先后顺序问题完成的。
注:关于 Kafka 解耦作用的思考: 注册中心可以将服务于服务之间解耦,但 Kafka 也可以通过相同的 topic 的消息传递实现业务的解耦。这两种形式都可以实现解耦,但笔者个人理解: