前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka消费者 组件源码 Fetcher

Kafka消费者 组件源码 Fetcher

作者头像
平凡的学生族
发布2020-06-30 16:36:09
9300
发布2020-06-30 16:36:09
举报
文章被收录于专栏:后端技术后端技术

序言

Fetcher是与KafkaConsumer交互的各大组件之一。在各大博客上,比如某csdn博客中提到,Fetcher的作用是:

Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。

Fetcher负责拉取什么消息?如何处理消息?它到底有什么功能,我们需要查阅源码。然而在类声明上的注释只有可怜兮兮的一句话:

所以我们要转换角度去观察。 首先,Fetcher没有继承Runnable或Thread,那么它只是一个API组件,而不是单独运行的线程

然后要观察一个类的作用,可以从两个角度入手:

  1. 与上游组件的交互。也就是它暴露的public方法。
  2. 与下游组件的交互。也就是它是如何调用下游组件的接口的。

与上游组件的交互

与上游组件的交互,就是指它所暴露出的public方法,因为只有public方法能被其它组件调用,这就是它提供的功能。所以我们要研究下这些public方法。 从Idea左侧栏->Structure,点击"Show non-public"按钮,隐藏非公有方法

从方法栏可以看到,Fetcher主要提供了四块功能:

  1. 拉取消息,如红框所示。从fetchedRecords可知,这些方法作用都与从服务器拉取消息有关,能够向服务器发送消息。
  2. 获取topic元数据,如黄框所示。
    • getTopicMetadata用于获取某topic的元数据。以PartitionInfo形式总结。
    • getAllTopicMetadata用于获取集群上所有topic的元数据。以PartitionInfo形式总结。
  3. 获取、刷新offset,如蓝框所示。
    • resetOffsetsIfNeeded会获取offset并刷新。
    • beginningOffsets和endOffsets分别会获取起始/终止的offset。
  4. 监控测量指标相关,如白框所示。并不是主要功能,暂不分析,

与下游组件的交互

查看Fetcher的成员变量可知,Fetcher主要与ConsumerNetworkClient组件交互。后者负责请求、响应的IO[1],那么前者就负责构造请求、处理响应。

搜索client.查看与ConsumerNetworkClient发生交互的地方,总共有8处。

其中client.sendclient.poll(代表发送请求、等待响应的调用。

代码语言:javascript
复制
// ConsumerNetworkClient.java
public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder)

public boolean poll(RequestFuture<?> future, long timeout)

通过对这两种方法的使用,可以向ConsumerNetworkClient发送请求,并添加处理响应的逻辑。有两种

异步响应逻辑

Fetcher利用监听器的机制,添加异步响应的逻辑。 比如sendFetches中,先调用client.send发出请求,再调用addListener添加请求完成后的回调逻辑。

同步响应逻辑

Fetcher调用client.send发出请求,调用client.poll等待请求完成,添加同步响应的逻辑。以getTopicMetadata为例

在sendMetadataRequest内部调用了client.send发送请求

查看poll可知,内部会循环等待,直到请求完成。

拉取消息

sendFetches调用client.send发送请求,通过addListener设置请求完成后的逻辑。在onSuccess中将拉取的数据,按照TopicPartition分别添加到completedFetches

代码语言:javascript
复制
public int sendFetches() {
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        ...
        final FetchRequest.Builder request = FetchRequest.Builder
                .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
                ...
        
        ...

        // 发送请求、设置回调逻辑
        client.send(fetchTarget, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        FetchResponse response = (FetchResponse) resp.responseBody();
                        
                        ...
                        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                            ...
                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                    resp.requestHeader().apiVersion()));  // 添加到completedFetches
                        }

                        ...
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        ...
                    }
                });
    }
    return fetchRequestMap.size();
}

sendFetches在请求完成后,通过OnSuccess执行成功逻辑

外界调用fetchedRecords来收获已经收到的消息。fetchedRecords从completedFetches取出拉取的消息,通过while循环,将消息从CompletedFetch类型转为PartitionRecords,再转为List<ConsumerRecord<K, V>>,添加到fetched中。

代码语言:javascript
复制
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    int recordsRemaining = maxPollRecords;

    try {
            // 通过循环完成拉取到的消息的加工,最多拉取maxPollRecords条消息
        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isFetched) {
                CompletedFetch completedFetch = completedFetches.peek();  // 从completedFetches查看拉取的消息
                if (completedFetch == null) break;  // 没有消息了,退出循环

                nextInLineRecords = parseCompletedFetch(completedFetch);  // 处理成PartitionRecords类型,也就是一个分区上拉到的数据
                completedFetches.poll();  // 去除队头
            } else {
                List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);  // 处理成List<ConsumerRecord<K, V>>类型
                TopicPartition partition = nextInLineRecords.partition;
                // 将拉取到的消息放入fetched
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                    if (currentRecords == null) {
                        fetched.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        fetched.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }
    } catch (KafkaException e) {
        if (fetched.isEmpty())
            throw e;
    }
    return fetched;
}

如图示:

在fetchedRecords的循环中,一条CompletedFetch的变化轨迹

总结

Fetcher向上游提供了拉取消息、获取topic元数据、获取/刷新offset的功能,并由ConsumerNetworkClient完成请求/响应的IO操作。


  1. 可以暂时这么认为,如果读者不放心可查阅资料
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 序言
  • 与上游组件的交互
  • 与下游组件的交互
    • 异步响应逻辑
      • 同步响应逻辑
      • 拉取消息
      • 总结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档