前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaConsumer分析 转

KafkaConsumer分析 转

作者头像
stys35
发布2019-03-15 10:28:20
6310
发布2019-03-15 10:28:20
举报
文章被收录于专栏:工作笔记精华工作笔记精华
代码语言:javascript
复制
一 重要的字段

String clientId:Consumer唯一标识

ConsumerCoordinator coordinator: 控制Consumer与服务器端GroupCoordinator之间的通信逻辑

Fetcher<K, V> fetcher: 负责从服务器端获取消息的组件,并且更新partition的offset

ConsumerNetworkClient client:  负责和服务器端通信

SubscriptionState subscriptions: 便于快速获取topic partition等状态,维护了消费者消费状态

Metadata metadata: 集群元数据信息

AtomicLong currentThread: 当前使用KafkaConsumer的线程id

AtomicInteger refcount: 重入次数



二 核心的方法

2.1 subscribe 订阅主题

订阅给定的主题列表,以获得动态分配的分区

主题的订阅不是增量的,这个列表将会代替当前的分配。注意,不可能将主题订阅与组管理与手动分区分配相结合

作为组管理的一部分,消费者将会跟踪属于某一个特殊组的消费者列表,如果满足在下列条件,将会触发再平衡操作:

1 订阅的主题列表的那些分区数量的改变

2 主题创建或者删除

3 消费者组的成员挂了

4 通过join api将一个新的消费者添加到一个存在的消费者组

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    // 取得一把锁
    acquire();
    try {
        if (topics == null) { // 主题列表为null,抛出异常
            throw new IllegalArgumentException("Topiccollection to subscribe to cannot be null");
        } else if (topics.isEmpty()) {// 主题列表为空,取消订阅
            this.unsubscribe();
        } else {
            for (String topic : topics) {
                if (topic == null || topic.trim().isEmpty())
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or emptytopic");
            }
            log.debug("Subscribed to topic(s):{}", Utils.join(topics, ", "));
            this.subscriptions.subscribe(new HashSet<>(topics), listener);
            // 用新提供的topic集合替换当前的topic集合,如果启用了主题过期,主题的过期时间将在下一次更新中重新设置。
            metadata.setTopics(subscriptions.groupSubscription());
        }
    } finally {
        // 释放锁
        release();
    }
}



2.2 assign 手动分配分区

对于用户手动指定topic的订阅模式,通过此方法可以分配分区列表给一个消费者:

public void assign(Collection<TopicPartition> partitions) {
    acquire();
    try {
        if (partitions == null) {
            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
        } else if (partitions.isEmpty()) {// partition为空取消订阅
            this.unsubscribe();
        } else {
            Set<String> topics = new HashSet<>();
            // 遍历TopicPartition,把topic添加到一个集合里
            for (TopicPartition tp : partitions) {
                String topic = (tp != null) ? tp.topic() : null;
                if (topic == null || topic.trim().isEmpty())
                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                topics.add(topic);
            }

            // 进行一次自动提交
            this.coordinator.maybeAutoCommitOffsetsNow();

            log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
            // 根据用户提供的指定的partitions 改变assignment
            this.subscriptions.assignFromUser(new HashSet<>(partitions));
            metadata.setTopics(topics);// 更新metatdata topic
        }
    } finally {
        release();
    }
}


2.3 commitSync & commitAsync 提交消费者已经消费完的消息的offset,为指定已订阅的主题和分区列表返回最后一次poll返回的offset

public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
    acquire();
    try {
        coordinator.commitOffsetsSync(offsets);
    } finally {
        release();
    }
}
 
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
    acquire();
    try {
        log.debug("Committing offsets: {} ", offsets);
        coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
    } finally {
        release();
    }
}


2.4 seek 指定消费者消费的起始位置

public void seek(TopicPartition partition, long offset) {
    if (offset < 0) {
        throw new IllegalArgumentException("seek offset must not be a negative number");
    }
    acquire();
    try {
        log.debug("Seeking to offset {} for partition {}", offset, partition);
        this.subscriptions.seek(partition, offset);
    } finally {
        release();
    }
}
// 为指定的分区查找第一个offset
public void seekToBeginning(Collection<TopicPartition> partitions) {
    acquire();
    try {
        Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
        for (TopicPartition tp : parts) {
            log.debug("Seeking to beginning of partition {}", tp);
            subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
        }
    } finally {
        release();
    }
}
// 为指定的分区查找最后的offset
public void seekToEnd(Collection<TopicPartition> partitions) {
    acquire();
    try {
        Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
        for (TopicPartition tp : parts) {
            log.debug("Seeking to end of partition {}", tp);
            subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
        }
    } finally {
        release();
    }
}


2.5 poll方法 获取消息

从指定的主题或者分区获取数据,在poll之前,你没有订阅任何主题或分区是不行的,每一次poll,消费者都会尝试使用最后一次消费的offset作为接下来获取数据的start offset,最后一次消费的offset也可以通过seek(TopicPartition, long)设置或者自动设置

public ConsumerRecords<K, V> poll(long timeout) {
    acquire();
    try {
        if (timeout < 0)
            throw new IllegalArgumentException("Timeout must not be negative");
        // 如果没有任何订阅,抛出异常
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

        // 一直poll新数据直到超时
        long start = time.milliseconds();
        // 距离超时还剩余多少时间
        long remaining = timeout;
        do {
            // 获取数据,如果自动提交,则进行偏移量自动提交,如果设置offset重置,则进行offset重置
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
            if (!records.isEmpty()) {
                // 再返回结果之前,我们可以进行下一轮的fetch请求,避免阻塞等待
                fetcher.sendFetches();
                client.pollNoWakeup();
                // 如果有拦截器进行拦截,没有直接返回
                if (this.interceptors == null)
                    return new ConsumerRecords<>(records);
                else
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }

            long elapsed = time.milliseconds() - start;
            remaining = timeout - elapsed;
        } while (remaining > 0);

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}


private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    // 轮询coordinator事件,处理周期性的offset提交
    coordinator.poll(time.milliseconds());

    // fetch positions if we have partitions we're subscribed to that we
    // don't know the offset for
    // 判断上一次消费的位置是否为空,如果不为空,则
    if (!subscriptions.hasAllFetchPositions())
        // 更新fetch position
        updateFetchPositions(this.subscriptions.missingFetchPositions());

    // 数据你准备好了就立即返回,也就是还有可能没有准备好
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty())
        return records;

    // 我们需要发送新fetch请求
    fetcher.sendFetches();

    long now = time.milliseconds();
    long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

    client.poll(pollTimeout, now, new PollCondition() {
        @Override
        public boolean shouldBlock() {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        }
    });
    // 早长时间的poll之后,我们应该在返回数据之前检查是否这个组需要重新平衡,以至于这个组能够迅速的稳定
    if (coordinator.needRejoin())
        return Collections.emptyMap();
    // 获取返回的消息
    return fetcher.fetchedRecords();
}


2.6 pause 暂停消费者,暂停后poll返回空

public void pause(Collection<TopicPartition> partitions) {
    acquire();
    try {
        for (TopicPartition partition: partitions) {
            log.debug("Pausing partition {}", partition);
            subscriptions.pause(partition);
        }
    } finally {
        release();
    }
}
// 返回暂停的分区

public Set<TopicPartition> paused() {
    acquire();
    try {
        return Collections.unmodifiableSet(subscriptions.pausedPartitions());
    } finally {
        release();
    }
}


2.7 resume 恢复消费者

public void resume(Collection<TopicPartition> partitions) {
    acquire();
    try {
        for (TopicPartition partition: partitions) {
            log.debug("Resuming partition {}", partition);
            subscriptions.resume(partition);
        }
    } finally {
        release();
    }
}


2.8 position方法 获取下一个消息的offset

// 获取下一个record的offset
public long position(TopicPartition partition) {
    acquire();
    try {
        if (!this.subscriptions.isAssigned(partition))
            throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
        Long offset = this.subscriptions.position(partition);
        if (offset == null) {
            updateFetchPositions(Collections.singleton(partition));
            offset = this.subscriptions.position(partition);
        }
        return offset;
    } finally {
        release();
    }
}

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档