专栏首页码匠的流水账聊聊kafka client的auto commit

聊聊kafka client的auto commit

本文主要聊一聊kafka client的auto commit的实现

maven

      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>

poll

kafka-clients-0.10.2.1-sources.jar!/org/apache/kafka/clients/consumer/KafkaConsumer.java

public ConsumerRecords<K, V> poll(long timeout) {
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

            // poll for new data until the timeout expires
            long start = time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0)
                        client.pollNoWakeup();

                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

这里调用了pollOnce方法

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:patterncat

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-10-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊pg jdbc statement的maxRows参数

    postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.j...

    codecraft
  • 聊聊flink的CheckpointScheduler

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/Checkp...

    codecraft
  • 聊聊storm的WindowedBolt

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

    codecraft
  • zookeeper源码分析(2)-客户端启动流程

    客户端的入口,负责启动整个客户端。持有ClientCnxn和ZKWatchManager的实例,提供了客户端对节点操作的方法。

    Monica2333
  • Flutter 实战:撸半个知乎日报~HomePage

    https://github.com/zhujian1989/ZhihuDailyPurifyByFlutter

    蜻蜓队长
  • MySQL忘记root密码,错误号码1045解决办法

    Windows可以右键我的电脑--管理--服务和应用程序--服务--找到对应的服务停止掉

    咕咕星
  • Spring_总结_04_高级配置(二)之条件注解@Conditional

    在上一节,我们了解到 Profile 为不同环境下使用不同的配置提供了支持,那么Profile到底是如何实现的呢?其实Profile正是通过条件注解来实现的。

    shirayner
  • 聊聊flink的CheckpointScheduler

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/Checkp...

    codecraft
  • Qt编写数据可视化大屏界面电子看板10-改造QCustomPlot

    为了抛弃对QChart的依赖,以及echart的依赖,(当然,后期也会做qchart的版本和echart的版本,尤其是echart的版本是肯定会做的,毕竟ech...

    feiyangqingyun
  • Flink是如何kafka读取数据的

    版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https...

    shengjk1

扫码关注云+社区

领取腾讯云代金券