前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka consumer assign 和 subscribe模式差异分析

kafka consumer assign 和 subscribe模式差异分析

作者头像
sanmutongzi
发布2020-03-04 15:36:29
2.5K1
发布2020-03-04 15:36:29
举报
文章被收录于专栏:stream processstream process

转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7200971.html

最近需要研究flink-connector-kafka的消费行为,发现flink使用了kafka consumer一个比较底层一点的assign接口而不是之前比较常用的subscirbe,于是研究下二者之间的差异。

首先看api文档:http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

public void assign(Collection<TopicPartition> partitions)

Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment and will replace the previous assignment (if there is one). If the given list of topic partitions is empty, it is treated the same as unsubscribe().

Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. Note that it is not possible to use both manual partition assignment with assign(Collection) and group assignment with subscribe(Collection, ConsumerRebalanceListener).

与subscirbe方法不同,assign方法由用户直接手动consumer实例消费哪些具体分区,根据api上述描述,assign的consumer不会拥有kafka的group management机制,也就是当group内消费者数量变化的时候不会有reblance行为发生。assign的方法不能和subscribe方法同时使用。

然后看一下具体实现源码:

<!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} -->

代码语言:javascript
复制
public class KafkaConsumer<K, V> implements Consumer<K, V>{
...........
private final SubscriptionState subscriptions;


  public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
...........
  this.subscriptions.subscribe(new HashSet<>(topics), listener);
...........
}

......
    public void assign(Collection<TopicPartition> partitions) {
...........
    this.subscriptions.assignFromUser(new HashSet<>(partitions));
...........
}

每一个KafkaConsumer实例内部都拥有一个SubscriptionState对象,subscribe内部调用了subscribe方法,assign内部调用了assignFromUser方法,看一下这两个方法的具体实现:

代码语言:javascript
复制
    private enum SubscriptionType {
        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
    } 

 private void setSubscriptionType(SubscriptionType type) {
        if (this.subscriptionType == SubscriptionType.NONE)
            this.subscriptionType = type;
        else if (this.subscriptionType != type)
            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
    }

    public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");

        setSubscriptionType(SubscriptionType.AUTO_TOPICS);

        this.listener = listener;

        changeSubscription(topics);
    }

    public void assignFromUser(Set<TopicPartition> partitions) {
        setSubscriptionType(SubscriptionType.USER_ASSIGNED);

        if (!this.assignment.partitionSet().equals(partitions)) {
            fireOnAssignment(partitions);

            Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
            for (TopicPartition partition : partitions) {
                TopicPartitionState state = assignment.stateValue(partition);
                if (state == null)
                    state = new TopicPartitionState();
                partitionToState.put(partition, state);
            }
            this.assignment.set(partitionToState);
            this.needsFetchCommittedOffsets = true;
        }
    }

由上述代码可知,SubscriptionState 内部拥有一个SubscriptionType类型的枚举变量subscriptionType,枚举共拥有NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED四种状态类型,subscribe方法会把subscriptionType状态设置为AUTO_TOPICS,assignFromUser会设置为USER_ASSIGNED。尤其是setSubscriptionType设置枚举的方法内部:else if (this.subscriptionType != type) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); 代码保证了,如果同一个consumer已经调用了某一种订阅模式,再次试图更改为另一种模式的时候程序会直接抛出错误。

poll方法调用情况下的不同实现

上述两种模式初始化的consumer在fetch数据的时候调用的是同样的poll方法,每次poll会调用pollOnce方法内的

<!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} span.s1 {color: #0326cc} -->

coordinator.poll(time.milliseconds());

具体源码如下

代码语言:javascript
复制
public void poll(long now) {
        invokeCompletedOffsetCommitCallbacks();

        if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
            ensureCoordinatorReady();
            now = time.milliseconds();
        }

        if (needRejoin()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription())
                client.ensureFreshMetadata();

            ensureActiveGroup();
            now = time.milliseconds();
        }

        pollHeartbeat(now);
        maybeAutoCommitOffsetsAsync(now);
    }


    public boolean needRejoin() {
        if (!subscriptions.partitionsAutoAssigned())
            return false;

        // we need to rejoin if we performed the assignment and metadata has changed
        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
            return true;

        // we need to join if our subscription has changed since the last join
        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
            return true;

        return super.needRejoin();
    }

<!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} span.s1 {color: #931a68} span.s2 {color: #0326cc} -->

代码语言:javascript
复制

ConsumerCoordinator的poll方法会判断consumer的前述SubscriptionType类型,只有类型是AUTO_TOPICS或者AUTO_PATTERN(subscribe方法的另一种参数重载),才会与ConsumerCoordinator进行交互,判断是否需要reblance等行为。

所以正如api文档描述,assign的consumer不会拥有kafka的group management机制,也就是当group内消费者数量变化的时候不会有reblance行为发生。

<!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} span.s1 {text-decoration: underline} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #0326cc} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #0326cc} --><!-- p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco} -->

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-07-18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档