前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【kafka原理】消费者提交已消费的偏移量

【kafka原理】消费者提交已消费的偏移量

作者头像
石臻臻的杂货铺[同名公众号]
发布2021-07-14 10:27:46
1.5K2
发布2021-07-14 10:27:46
举报
文章被收录于专栏:kafka专栏

那在上一篇文章中我们了解了 消费者偏移量__consumer_offsets_,知道了 消费者在消费了消息之后会把消费的offset 更新到以 名称为__consumer_offsets_的内置Topic中; 每个消费组都有维护一个当前消费组的offset; 那么就会有以下疑问

到底消费组什么时候把offset更新到broker中的分区中呢? 每次消费一条消息就提交到 broker中去更新?那这样是不是会有一些效率的一些问题?

既然有了疑问 ,那么我们本篇文章就来好好分析一下这个问题!

通过查询 kafka消费者配置中找到有以下几个配置

Name

描述

default

enable.auto.commit

如果为true,消费者的offset将在后台周期性的提交

true

auto.commit.interval.ms

如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)

5000

自动提交

消费者端开启了自动提交之后,每隔auto.commit.interval.ms自动提交一次;

代码语言:javascript
复制
    public static void consumer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxx1:9092,xxx2:9092,xxx3:9092");
        props.put("group.id", "szz-local-consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "5000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("szz1-test-topic"));
        while (true) {
            Duration duration = Duration.ofSeconds(5);
            ConsumerRecords<String, String> records = consumer.poll(duration);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

假如Consumer在获取了消息消费成功但是在提交之前服务挂掉了

如果发生这种情况会有什么影响? 答: 重复消费

消费者消费之后 offset并没有及时更新过去,那么在下次启动或者同组内其他消费者去消费的时候 取到的数据就是之前的数据;

那么就会出现 重复消费的情况;

所以auto.commit.interval.ms到底设置成多少就很有考究了

手动提交

虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。

手动提交 offset 的方法有两种:分别是 commitSync(同步提交)commitAsync(异步 提交)。两者的相同点是,都会将本次poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

同步提交 offset

代码语言:javascript
复制
    public static void consumerCommitSync(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");
        props.put("group.id", "szz-local-consumer");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("szz1-test-topic"));
        while (true) {
            Duration duration = Duration.ofSeconds(2);
            ConsumerRecords<String, String> records = consumer.poll(duration);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            //同步提交,当前线程会阻塞直到 offset 提交成功
            consumer.commitSync();
        }
    }

异步提交

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞

吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

代码语言:javascript
复制
    public static void consumerCommitAsync(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");
        props.put("group.id", "szz-local-consumer");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("szz1-test-topic"));
        while (true) {
            Duration duration = Duration.ofSeconds(2);
            ConsumerRecords<String, String> records = consumer.poll(duration);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }

            //异步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

                    if (exception != null) {
                        System.err.println("异常.....");
                    } }
            });


        }
    }

数据漏消费和重复消费分析

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先

提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据 的重复消费

参考资料

kafka文档: 密码:hiry

kafka消费者配置

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/10/27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自动提交
    • 假如Consumer在获取了消息消费成功但是在提交之前服务挂掉了
    • 手动提交
      • 同步提交 offset
        • 异步提交
          • 数据漏消费和重复消费分析
          • 参考资料
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档