前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解Kafka消费端分区分配策略

深入理解Kafka消费端分区分配策略

原创
作者头像
码农架构
修改2021-02-23 18:01:21
3.4K1
修改2021-02-23 18:01:21
举报
文章被收录于专栏:码农架构码农架构

Java-Bang

专注于系统架构、高可用、高性能、高并发类技术分享

Kafka 提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor,即采用 RangeAssignor 分配策略。除此之外,Kafka 还提供了另外两种分配策略:RoundRobinAssignor 和 StickyAssignor。消费者客户端参数 partition.assignment.strategy 可以配置多个分配策略,彼此之间以逗号分隔。

RangeAssignor分配策略

RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个主题,RangeAssignor 策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设 n = 分区数/消费者数量,m = 分区数%消费者数量,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量- m)个消费者每个分配 n 个分区。

为了更加通俗地讲解 RangeAssignor 策略,我们不妨再举一些示例。假设消费组内有2个消费者 C0 和 C1,都订阅了主题 t0 和 t1,并且每个主题都有4个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为:

代码语言:javascript
复制
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3

这样分配得很均匀,那么这个分配策略能够一直保持这种良好的特性吗?我们不妨再来看另一种情况。假设上面例子中2个主题都只有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

代码语言:javascript
复制
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2

可以明显地看到这样的分配并不均匀,如果将类似的情形扩大,则有可能出现部分消费者过载的情况。对此我们再来看另一种 RoundRobinAssignor 策略的分配效果如何。

RoundRobinAssignor分配策略

RoundRobinAssignor 分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。RoundRobinAssignor 分配策略对应的 partition.assignment.strategy 参数值为 org.apache.kafka.clients.consumer.RoundRobinAssignor。

如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobinAssignor 分配策略的分区分配会是均匀的。举个例子,假设消费组中有2个消费者 C0 和 C1,都订阅了主题 t0 和 t1,并且每个主题都有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

代码语言:javascript
复制
消费者C0:t0p0、t0p2、t1p1
消费者C1:t0p1、t1p0、t1p2

如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。如果某个消费者没有订阅消费组内的某个主题,那么在分配分区的时候此消费者将分配不到这个主题的任何分区。

举个例子,假设消费组内有3个消费者(C0、C1 和 C2),它们共订阅了3个主题(t0、t1、t2),这3个主题分别有1、2、3个分区,即整个消费组订阅了 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区。具体而言,消费者 C0 订阅的是主题 t0,消费者 C1 订阅的是主题 t0 和 t1,消费者 C2 订阅的是主题 t0、t1 和 t2,那么最终的分配结果为:

代码语言:javascript
复制
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2

可以看 到 RoundRobinAssignor 策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区 t1p1 分配给消费者 C1。

StickyAssignor分配策略

我们再来看一下 StickyAssignor 分配策略,“sticky”这个单词可以翻译为“黏性的”,Kafka 从 0.11.x 版本开始引入这种分配策略,它主要有两个目的:

  1. 分区的分配要尽可能均匀。
  2. 分区的分配尽可能与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor 分配策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂得多。我们举例来看一下 StickyAssignor 分配策略的实际效果。

假设消费组内有3个消费者(C0、C1 和 C2),它们都订阅了4个主题(t0、t1、t2、t3),并且每个主题有2个分区。也就是说,整个消费组订阅了 t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 这8个分区。最终的分配结果如下:

代码语言:javascript
复制
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1

这样初看上去似乎与采用 RoundRobinAssignor 分配策略所分配的结果相同,但事实是否真的如此呢?再假设此时消费者 C1 脱离了消费组,那么消费组就会执行再均衡操作,进而消费分区会重新分配。如果采用 RoundRobinAssignor 分配策略,那么此时的分配结果如下:

代码语言:javascript
复制
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1

如分配结果所示,RoundRobinAssignor 分配策略会按照消费者 C0 和 C2 进行重新轮询分配。如果此时使用的是 StickyAssignor 分配策略,那么分配结果为:

代码语言:javascript
复制
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对消费者 C0 和 C2 的所有分配结果,并将原来消费者 C1 的“负担”分配给了剩余的两个消费者 C0 和 C2,最终 C0 和 C2 的分配还保持了均衡。

如果发生分区重分配,那么对于同一个分区而言,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor 分配策略如同其名称中的“sticky”一样,让分配策略具备一定的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

到目前为止,我们分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。

举个例子,同样消费组内有3个消费者(C0、C1 和 C2),集群中有3个主题(t0、t1 和 t2),这3个主题分别有1、2、3个分区。也就是说,集群中有 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区。消费者 C0 订阅了主题 t0,消费者 C1 订阅了主题 t0 和 t1,消费者 C2 订阅了主题 t0、t1 和 t2。

如果此时采用 RoundRobinAssignor 分配策略,那么最终的分配结果如分配清单11-1所示(和讲述 RoundRobinAssignor 分配策略时的一样,这样不妨赘述一下):

代码语言:javascript
复制
分配清单11-1 RoundRobinAssignor分配策略的分配结果
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2

如果此时采用的是 StickyAssignor 分配策略,那么最终的分配结果如分配清单11-2所示。

代码语言:javascript
复制
分配清单11-2 StickyAssignor分配策略的分配结果
消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2

可以看到这才是一个最优解(消费者 C0 没有订阅主题 t1 和 t2,所以不能分配主题 t1 和 t2 中的任何分区给它,对于消费者 C1 也可同理推断)。

假如此时消费者 C0 脱离了消费组,那么 RoundRobinAssignor 分配策略的分配结果为:

代码语言:javascript
复制
消费者C1:t0p0、t1p1
消费者C2:t1p0、t2p0、t2p1、t2p2

可以看到 RoundRobinAssignor 策略保留了消费者 C1 和 C2 中原有的3个分区的分配:t2p0、t2p1 和 t2p2(针对分配清单 11-1)。如果采用的是 StickyAssignor 分配策略,那么分配结果为:

代码语言:javascript
复制
消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2

可以看到 StickyAssignor 分配策略保留了消费者 C1 和 C2 中原有的5个分区的分配:t1p0、t1p1、t2p0、t2p1、t2p2。 对 ConsumerRebalanceListener 而言,StickyAssignor 分配策略可以提供一定程度上的优化:

代码语言:javascript
复制
public class TheOldRebalanceListener implements ConsumerRebalanceListener {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for (TopicPartition topicPartition : partitions) {
            commitOffsets(partition);
            cleanupState(partition);
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for (TopicPartition topicPartition : partitions) {
            initializeState(partition);
            initializeOffset(partition);
        }
    }
}

如前所述,使用 StickyAssignor 分配策略的一个优点就是可以使分区重分配具备“黏性”,减少不必要的分区移动(即一个分区剥离之前的消费者,转而分配给另一个新的消费者)。

代码语言:javascript
复制
class TheNewRebalanceListener implements ConsumerRebalanceListener{
    Collection<TopicPartition> lastAssignment = Collections.emptyList();

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for (TopicPartition topicPartition : partitions) {
            commitOffsets(partition);
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
        for (TopicPartition topicPartition : 
            	difference(lastAssignment, assignment)) {
            cleanupState(partition);
        }
        for (TopicPartition topicPartition : 
            	difference(assignment, lastAssignment)) {
            initializeState(partition);
        }
        for (TopicPartition topicPartition : assignment) {
            initializeOffset(partition);
        }
        this.lastAssignment = assignment;
    }
}

从结果上看,StickyAssignor 分配策略比另外两者分配策略而言显得更加优异,这个策略的代码实现也异常复杂,如果读者没有接触过这种分配策略,不妨使用一下来尝尝鲜。

自定义分区分配策略

读者不仅可以任意选用 Kafka 提供的3种分配策略,还可以自定义分配策略来实现更多可选的功能。自定义的分配策略必须要实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口。PartitionAssignor 接口的定义如下:

代码语言:javascript
复制
Subscription subscription(Set<String> topics);
String name();
Map<String, Assignment> assign(Cluster metadata, 
                               Map<String, Subscription> subscriptions);
void onAssignment(Assignment assignment);

class Subscription {
    private final List<String> topics;
    private final ByteBuffer userData;
(省略若干方法……)
}

class Assignment {
    private final List<TopicPartition> partitions;
    private final ByteBuffer userData;
(省略若干方法……)
}

PartitionAssignor 接口中定义了两个内部类:Subscription 和 Assignment。

Subscription 类用来表示消费者的订阅信息,类中有两个属性:topics 和 userData,分别表示消费者的订阅主题列表和用户自定义信息。PartitionAssignor 接口通过 subscription() 方法来设置消费者自身相关的 Subscription 信息,注意到此方法中只有一个参数 topics,与 Subscription 类中的 topics 的相呼应,但并没有体现有关 userData 的参数。为了增强用户对分配结果的控制,可以在 subscription() 方法内部添加一些影响分配的用户自定义信息赋予 userData,比如权重、IP 地址、host 或机架(rack)等。

7-1
7-1

举个例子,在 subscription() 方法中提供机架信息,标识此消费者所部署的机架位置,在分区分配时可以根据分区的 leader 副本所在的机架位置来实施具体的分配,这样可以让消费者与所需拉取消息的 broker 节点处于同一机架。

参考上图,消费者 consumer1 和 broker1 都部署在机架 rack1 上,消费者 consumer2 和 broker2 都部署在机架 rack2 上。如果分区的分配不是机架感知的,那么有可能与上图(上半部分)中的分配结果一样,consumer1 消费 broker2 中的分区,而 consumer2 消费 broker1 中的分区;如果分区的分配是机架感知的,那么就会出现上图(下半部分)的分配结果,consumer1 消费 broker1 中的分区,而 consumer2 消费 broker2 中的分区,这样相比前一种情形,既可以减少消费延时,又可以减少跨机架带宽的占用。

再来说一下 Assignment 类,它用来表示分配结果信息,类中也有两个属性:partitions 和 userData,分别表示所分配到的分区集合和用户自定义的数据。PartitionAssignor 接口中的 onAssignment() 方法是在每个消费者收到消费组 leader 分配结果时的回调函数,例如在 StickyAssignor 分配策略中就是通过这个方法保存当前的分配方案,以备在下次消费组再均衡(rebalance)时可以提供分配参考依据。

接口中的 name() 方法用来提供分配策略的名称,对 Kafka 提供的3种分配策略而言,RangeAssignor 对应的 protocol_name 为“range”,RoundRobinAssignor 对应的 protocol_name 为“roundrobin”,StickyAssignor 对应的 protocol_name 为“sticky”,所以自定义的分配策略中要注意命名的时候不要与已存在的分配策略发生冲突。这个命名用来标识分配策略的名称,在后面所描述的加入消费组及选举消费组 leader 的时候会有涉及。

真正的分区分配方案的实现是在 assign() 方法中,方法中的参数 metadata 表示集群的元数据信息,而 subscriptions 表示消费组内各个消费者成员的订阅信息,最终方法返回各个消费者的分配信息。

Kafka 还提供了一个抽象类 org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化实现 PartitionAssignor 接口的工作,并对 assign() 方法进行了详细实现,其中会将 Subscription 中的 userData 信息去掉后再进行分配。Kafka 提供的3种分配策略都继承自这个抽象类。如果开发人员在自定义分区分配策略时需要使用 userData 信息来控制分区分配的结果,那么就不能直接继承 AbstractPartitionAssignor 这个抽象类,而需要直接实现 PartitionAssignor 接口。

下面笔者参考 Kafka 的 RangeAssignor 分配策略来自定义一个随机的分配策略,这里笔者称之为 RandomAssignor,具体代码实现如下:

代码语言:javascript
复制
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

public class RandomAssignor extends AbstractPartitionAssignor {
    @Override
    public String name() {
        return "random";
    }

    @Override
    public Map<String, List<TopicPartition>> assign(
            Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic = 
                consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet()) {
            assignment.put(memberId, new ArrayList<>());
        }

        //针对每一个主题进行分区分配
        for (Map.Entry<String, List<String>> topicEntry : 
                consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<String> consumersForTopic = topicEntry.getValue();
            int consumerSize = consumersForTopic.size();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null) {
                continue;
            }

            //当前主题下的所有分区
            List<TopicPartition> partitions = 
                AbstractPartitionAssignor.partitions(topic, 
                numPartitionsForTopic);
            //将每个分区随机分配给一个消费者
            for (TopicPartition partition : partitions) {
                int rand = new Random().nextInt(consumerSize);
                String randomConsumer = consumersForTopic.get(rand);
                assignment.get(randomConsumer).add(partition);
            }
        }
        return assignment;
    }

    //获取每个主题对应的消费者列表,即[topic, List[consumer]]
    private Map<String, List<String>> consumersPerTopic(
          Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry : 
                consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics())
                put(res, topic, consumerId);
        }
        return res;
    }
}

在使用时,消费者客户端需要添加相应的 Properties 参数,示例如下:

代码语言:javascript
复制
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
     RandomAssignor.class.getName());

这里只是演示如何自定义实现一个分区分配策略,RandomAssignor 的实现并不是特别理想,并不见得会比 Kafka 自身提供的 RangeAssignor 之类的策略要好。

按照 Kafka 默认的消费逻辑设定,一个分区只能被同一个消费组(ConsumerGroup)内的一个消费者消费。但这一设定不是绝对的,我们可以通过自定义分区分配策略使一个分区可以分配给多个消费者消费。

7-2
7-2

考虑一种极端情况,同一消费组内的任意消费者都可以消费订阅主题的所有分区,从而实现了一种“组内广播(消费)”的功能。分配效果如上图所示。

下面展示了组内广播分配策略的具体代码实现:

代码语言:javascript
复制
public class BroadcastAssignor extends AbstractPartitionAssignor{
    @Override
    public String name() {
        return "broadcast";
    }

    private Map<String, List<String>> consumersPerTopic(
            Map<String, Subscription> consumerMetadata) {
        (具体实现请参考RandomAssignor中的consumersPerTopic()方法)
    }

    @Override
    public Map<String, List<TopicPartition>> assign(
            Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic =
                consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
		   //Java8
        subscriptions.keySet().forEach(memberId ->
                assignment.put(memberId, new ArrayList<>()));
		   //针对每一个主题,为每一个订阅的消费者分配所有的分区
        consumersPerTopic.entrySet().forEach(topicEntry->{
            String topic = topicEntry.getKey();
            List<String> members = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null || members.isEmpty())
                return;
            List<TopicPartition> partitions = AbstractPartitionAssignor
                    .partitions(topic, numPartitionsForTopic);
            if (!partitions.isEmpty()) {
                members.forEach(memberId ->
                        assignment.get(memberId).addAll(partitions));
            }
        });
        return assignment;
    }
}

注意组内广播的这种实现方式会有一个严重的问题—默认的消费位移的提交会失效。所有的消费者都会提交它自身的消费位移到 __consumer_offsets 中,后提交的消费位移会覆盖前面提交的消费位移。

假设消费者 consumer1 提交了分区 tp0 的消费位移为10,这时消费者 consumer2 紧接着提交了同一分区 tp0 的消费位移为12,如果此时消费者 consumer1 由于某些原因重启了,那么 consumer1 就会从位移12之后重新开始消费,这样 consumer1 就丢失了部分消息。

再考虑另一种情况,同样消费者 consumer1 提交了分区 tp0 的消费位移为10,这时消费者 consumer2 紧接着提交了同一分区的消费位移为8,如果此时消费者 consumer1 由于某些原因重启了,那么 consumer1 就会从位移8之后重新开始消费,这样 consumer1 就重复消费了消息。很多情形下,重复消费少量消息对于上层业务应用来说可以忍受。但是设想这样一种情况,消费组内的消费者对于分区tp0的消费位移都在100000之后了,此时又有一个新的消费者 consumer3 加入进来,消费了部分消息之后提交了 tp0 的消费位移为9,那么此时原消费组内的任何消费者重启都会从这个消费位移9之后再开始重新消费,这样大量的重复消息会让上层业务应用猝不及防,同样会造成计算资源的浪费。

针对上述这种情况,如果要真正实现组内广播,则需要自己保存每个消费者的消费位移。笔者的实践经验是,可以通过将消费位移保存到本地文件或数据库中等方法来实现组内广播的位移提交。

7-3
7-3

虽然说可以通过自定义分区分配策略来打破 Kafka 中“一个分区只能被同一个消费组内的一个消费者消费”的禁忌(参考上图中的消费者 C0 和 C1),但想要通过自定义分区分配策略来实现上图中的消费者 C3 和 C4 共同分享单个分区的消息是不现实的。更加通俗一点来说,上图中的消费者 C3 和 C4 都处于正常稳定的状态,此时它们想要共同分享分区3中的消息,即 C3 消费0、1、2这3条消息,而 C4 消费3、4这2条消息,紧接着 C3 再消费5、6、7这3条消息,这种分配是无法实现的。不过这种诉求可以配合 KafkaConsumer 中的 seek()方法来实现,实际应用价值不大

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RangeAssignor分配策略
  • RoundRobinAssignor分配策略
  • StickyAssignor分配策略
  • 自定义分区分配策略
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档