专栏首页后端技术Kafka消费者 组件源码 Fetcher

Kafka消费者 组件源码 Fetcher

序言

Fetcher是与KafkaConsumer交互的各大组件之一。在各大博客上,比如某csdn博客中提到,Fetcher的作用是:

Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。

Fetcher负责拉取什么消息?如何处理消息?它到底有什么功能,我们需要查阅源码。然而在类声明上的注释只有可怜兮兮的一句话:

所以我们要转换角度去观察。 首先,Fetcher没有继承Runnable或Thread,那么它只是一个API组件,而不是单独运行的线程

然后要观察一个类的作用,可以从两个角度入手:

  1. 与上游组件的交互。也就是它暴露的public方法。
  2. 与下游组件的交互。也就是它是如何调用下游组件的接口的。

与上游组件的交互

与上游组件的交互,就是指它所暴露出的public方法,因为只有public方法能被其它组件调用,这就是它提供的功能。所以我们要研究下这些public方法。 从Idea左侧栏->Structure,点击"Show non-public"按钮,隐藏非公有方法

从方法栏可以看到,Fetcher主要提供了四块功能:

  1. 拉取消息,如红框所示。从fetchedRecords可知,这些方法作用都与从服务器拉取消息有关,能够向服务器发送消息。
  2. 获取topic元数据,如黄框所示。
    • getTopicMetadata用于获取某topic的元数据。以PartitionInfo形式总结。
    • getAllTopicMetadata用于获取集群上所有topic的元数据。以PartitionInfo形式总结。
  3. 获取、刷新offset,如蓝框所示。
    • resetOffsetsIfNeeded会获取offset并刷新。
    • beginningOffsets和endOffsets分别会获取起始/终止的offset。
  4. 监控测量指标相关,如白框所示。并不是主要功能,暂不分析,

与下游组件的交互

查看Fetcher的成员变量可知,Fetcher主要与ConsumerNetworkClient组件交互。后者负责请求、响应的IO[1],那么前者就负责构造请求、处理响应。

搜索client.查看与ConsumerNetworkClient发生交互的地方,总共有8处。

其中client.sendclient.poll(代表发送请求、等待响应的调用。

// ConsumerNetworkClient.java
public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder)

public boolean poll(RequestFuture<?> future, long timeout)

通过对这两种方法的使用,可以向ConsumerNetworkClient发送请求,并添加处理响应的逻辑。有两种

异步响应逻辑

Fetcher利用监听器的机制,添加异步响应的逻辑。 比如sendFetches中,先调用client.send发出请求,再调用addListener添加请求完成后的回调逻辑。

同步响应逻辑

Fetcher调用client.send发出请求,调用client.poll等待请求完成,添加同步响应的逻辑。以getTopicMetadata为例

在sendMetadataRequest内部调用了client.send发送请求

查看poll可知,内部会循环等待,直到请求完成。

拉取消息

sendFetches调用client.send发送请求,通过addListener设置请求完成后的逻辑。在onSuccess中将拉取的数据,按照TopicPartition分别添加到completedFetches

public int sendFetches() {
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        ...
        final FetchRequest.Builder request = FetchRequest.Builder
                .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
                ...
        
        ...

        // 发送请求、设置回调逻辑
        client.send(fetchTarget, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        FetchResponse response = (FetchResponse) resp.responseBody();
                        
                        ...
                        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                            ...
                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                    resp.requestHeader().apiVersion()));  // 添加到completedFetches
                        }

                        ...
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        ...
                    }
                });
    }
    return fetchRequestMap.size();
}

sendFetches在请求完成后,通过OnSuccess执行成功逻辑

外界调用fetchedRecords来收获已经收到的消息。fetchedRecords从completedFetches取出拉取的消息,通过while循环,将消息从CompletedFetch类型转为PartitionRecords,再转为List<ConsumerRecord<K, V>>,添加到fetched中。

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    int recordsRemaining = maxPollRecords;

    try {
            // 通过循环完成拉取到的消息的加工,最多拉取maxPollRecords条消息
        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isFetched) {
                CompletedFetch completedFetch = completedFetches.peek();  // 从completedFetches查看拉取的消息
                if (completedFetch == null) break;  // 没有消息了,退出循环

                nextInLineRecords = parseCompletedFetch(completedFetch);  // 处理成PartitionRecords类型,也就是一个分区上拉到的数据
                completedFetches.poll();  // 去除队头
            } else {
                List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);  // 处理成List<ConsumerRecord<K, V>>类型
                TopicPartition partition = nextInLineRecords.partition;
                // 将拉取到的消息放入fetched
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                    if (currentRecords == null) {
                        fetched.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        fetched.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }
    } catch (KafkaException e) {
        if (fetched.isEmpty())
            throw e;
    }
    return fetched;
}

如图示:

在fetchedRecords的循环中,一条CompletedFetch的变化轨迹

总结

Fetcher向上游提供了拉取消息、获取topic元数据、获取/刷新offset的功能,并由ConsumerNetworkClient完成请求/响应的IO操作。


  1. 可以暂时这么认为,如果读者不放心可查阅资料

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 张龙netty学习笔记 P10

    因为其请求-响应机制,聊天室是无法实现的。为了曲线救国,就出现了轮训技术,但不断的请求会造成带宽的浪费,而且不及时,而且header-body结构也会使请求报文...

    平凡的学生族
  • spring 自动配置(下) 自动配置总结

    图太大,放不下,请点开大图(不点开大图看的是缩略图),再右键"新标签页打开图片"查看。也可以点开大图保存到本地查看:

    平凡的学生族
  • spring 自动加载配置

    所以,如果一个包希望能适配springboot的自动配置功能,就要填写上面的文件(视需要而定,填写其中的几个)。

    平凡的学生族
  • Vue 项目实战上传文件与接口OPTIONS

    在项目的开发过程中难免会遇到许多的坑,寻找答案成为了至关重要的一步,职场中解决问题的能力是必要切重要的,有些问题网上给的答案大多都是千篇一律重复性东西太多,而且...

    六小登登
  • 不为人知的python request小技巧

    ? 作者:Op小剑 来源: http://blog.csdn.net/xie_0723/article/details/52790786 关于 Python ...

    小小科
  • Provisional headers are shown in Chrome network tab

    细心的同学应该留意到,新版开发者工具的 Network 面板中,某些请求头后面会跟着下面这行文字:

    Jerry Wang
  • 使用Fiddler进行抓包

    jmeter技术研究
  • COS 调试工具,助你快速定位请求错误

    通过开发者配置 Hosts 抓包 COS API 请求,智能分析请求中出现的错误,给出错误提示和修改建议。

    Jinqn
  • 每天一道面试题 | day07

    GET请求获取Request-URI所标识的资源,例如:在浏览器的地址栏中输入网址的方式访问网页时,浏览器采用GET方法向服务器获取资源。

    剑走天涯
  • HTTP错误代码

    200:正确的请求返回正确的结果,如果不想细分正确的请求结果都可以直接返回200。 201:表示资源被正确的创建。比如说,我们 POST 用户名、密码正确创建...

    居士

扫码关注云+社区

领取腾讯云代金券