前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试系列-kafka偏移量提交

面试系列-kafka偏移量提交

作者头像
用户4283147
发布2022-12-29 20:03:26
8280
发布2022-12-29 20:03:26
举报
文章被收录于专栏:对线JAVA面试对线JAVA面试

提交相关概念

  • 提交:消费者消费完消息之后,更新自己消费那个消息的操作;
  • _consumer_offset:消费者消费完消息之后,会往_consumer_offset主题发送消息,_consumer_offset保存每个分区的偏移量;
  • 分区再均衡:消费者的数量发生变化,或者主题分区数量发生变化,会修改消费者对应的分区关系,叫做分区再均衡:保证kafka高可用和伸缩性;缺点:在均衡期间,消费者无法读取消息,群组短时间不可用;

重复消费/丢失消费

重复消费
丢失消费

自动提交

Kafka 中默认的消费位移的提交方式为自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable.auto.commit 参数为 true;

在默认的配置下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移;每过5秒就会提交偏移量,但是在4秒发生了分区在均衡,偏移量还没来得及提交,他们这四秒的消息就会被重复消费;

当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况;

手动提交

自动提交消费位移的方式并没有为开发者留有余地来处理重复消费和消息丢失的问题,无法做到精确的位移管理;kafka提供了手动位移提交的方式,这样就可以使得开发人员对消费位移的管理控制更加灵活,开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false;

手动提交又分为同步提交和异步提交,对应于KafkaConsumer中的commitSync()和commitAsync()两种类型的方法;

手动同步提交

auto.commit. offset = false:使用commitsync()提交poll()返回最新偏移量;

注意:

  • 处理完业务之后,一定要手动调用commitsync();
  • 如果发生了在均衡,由于当前commitsync偏移量还未提交,所以消息会被重复消费;
  • commitsync会阻塞直到提交成功;
代码语言:javascript
复制
public class CommitSync {
    public static void main(String[] args) {
        /**消息消费者*/
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        /*取消自动提交*/
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
 
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>
            (properties);
        try {
            consumer.subscribe(Collections.singletonList("simple"));
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(500);
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format(
                            "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                            record.topic(),record.partition(),record.offset(), 
                        record.key(),record.value()));
                    //自定义业务逻辑
                }
                //开始事务
                //读业务写数据库-
                //偏移量写入数据库
                //提交
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}
手动异步提交

注意:

  1. commitAsync()不会重试提交偏移量,重试提交可能会导致重复消费;
  2. commitAsync()也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标;
代码语言:javascript
复制
public class CommitAsync {
    public static void main(String[] args) {
        /**消息消费者*/
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        /**取消自动提交*/
        properties.put("enable.auto.commit",false);
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(
            properties);
        try {
            consumer.subscribe(Collections.singletonList("simple"));
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(500);
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format("主题:%s,分区:%d,偏移量:%d,key:
                                                     %s,value:%s",
                            record.topic(),record.partition(),record.offset(), 
                                                     record.key(),record.value()));
                    //自定义业务逻辑开发
                }
                //消费完毕,同步提交
                consumer.commitAsync();
            }
        } finally {
            consumer.close();
        }
    }
}
同步和异步组合提交

中间处理消息的时候,即使偶尔出现一次偏移量提交失败,后面消费的时候,偏移量也能够提交成功,所以不会有大影响;但是到了最后消费者要关闭了的时候,偏移量一定要提交成功;因此在消费者关闭前一般会组合使用 commitAsync()和commitsync() ,同步一定会提交成功,异步可能会失败;

代码语言:javascript
复制
public class SyncAndAsync {
    public static void main(String[] args) {
        /**消息消费者*/
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        /*取消自动提交*/
        properties.put("enable.auto.commit",false);
 
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            consumer.subscribe(Collections.singletonList("simple"));
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(500);
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format(
                            "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                            record.topic(),record.partition(),record.offset(),
                            record.key(),record.value()));
                    //自定义工作
                }
                //避免等待,异步提交
                consumer.commitAsync();
            }
        } catch (CommitFailedException e) {
            System.out.println("提交失败");
            e.printStackTrace();
        } finally {
            try {
                //最后一次提交,确保成功,同步提交
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-11-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 对线JAVA面试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 提交相关概念
  • 重复消费/丢失消费
    • 重复消费
      • 丢失消费
      • 自动提交
      • 手动提交
        • 手动同步提交
          • 手动异步提交
            • 同步和异步组合提交
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档