专栏首页大数据技术栈kafka Consumer — offset的控制

kafka Consumer — offset的控制

前言

在N久之前,曾写过kafka 生产者使用详解, 今天补上关于 offset 相关的内容。 那么本文主要涉及:

  1. Kafka 消费者的两个大版本
  2. 消费者的基本使用流程
  3. 重点:offset 的控制

消費者版本

  1. 开源之初使用Scala 语言编写的客户端, 我们可以称之为旧消费者客户端(Old Consumer) 或 Scala 消费者客户端;
  2. 第二个是从Kafka 0.9. x 版本开始推出的使用Java 编写的客户端, 我们可以称之为新消费者客户端( New Consumer ) 或Java 消费者客户端, 它弥补了旧客户端中存在的诸多设计缺陷, 不过我不建议你在0.9.x 使用该客户端, 该新客户端再 0.10.0 才算比较稳定了

这里额外提一句就是,客户端从scala 语言转向 java, 并不是 java 比 scala 要怎么怎么样, 仅仅只是因为社区的开发者换人了~~~~

开发一个消费者的正常流程

一个正常的消费逻辑需要具备以下几个步骤:

  1. 配置消费者客户端参数及创建相应的消费者实例。
  2. 订阅主题。
  3. 拉取消息并消费。
  4. 提交消费位移。
  5. 关闭消费者实例。

消费者可以订阅多个Topic, consumer.subscribe(Arrays.asList("t1","t2"))), 如果订阅多次,后面的会覆盖前面的, 所以取消订阅其实也可以去订阅一个空集合。

订阅支持正则表达式: consumer.subscribe(Pattern.compile("topic .*")); 这样订阅后,如果kafka后面新增了满足该正则的 Topic也会被该消费者消费

消费者也可以直接订阅某个分区的数据, 这里我们贴下代码,如下:

List<TopicPartition> partitions = new ArrayList<>();
// 查询kafka分区信息
List<Partitioninfo> partitioninfos = consumer.partitionsFor( topic );
if (partitioninfos != null) {
for (Partitioninfo tpinfo : partitioninfos) {
partitions.add(new TopicPartition( tpinfo.topic(), tpinfo.partition() )) ;
consumer.assign( partitions ) ;

值得注意的是: subscribe订阅是具有分区在均衡能力的, 而 assign 是没有的

这里我们只是简单的过了一下 消费者, 因为不是本文的重点, 如果要详细了解的话, 还是去看看这篇 kafka 生产者使用详解

Offset 提交

这里指的是消费者消费的位移, 而不是Kafka端储存的消息的 offset, 这其中的区别希望读者清楚,不要混淆了。 对于offset 的提交, 我们要清楚一点 如果我们消费到了 offset=x 的消息 那么提交的应该是 offset=x+1, 而不是 offset=x

kafka的提交方式分为两种:

自动提交

在Kafka 中默认的消费位移的提交方式是自动提交, 这个由消费者客户端参数enable.auto.commit 配置, 默认值为true。 当然这个默认的自动提交不是每消费一条消息就提交一次, 而是定期提交, 这个定期的周期时间由客户端参数auto.commit.interval.ms配置, 默认值为5 秒, 此参数生效的前提是enable.auto.commit 参数为true。 自动位移提交的动作是在poll()方法的逻辑里完成的, 在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交, 如果可以,那么就会提交上一轮消费的位移。

手动提交

  • commitSync() 同步提交
    • 批量提交 该方式的最大问题在于数据是批量处理, 当部分数据完成消费, 还没来得及提交offset就被中断, 则会使得下次消费会重复消费那部分已经消费过的数据。 consumer.commitSync()会在消费完数据后, 将消费完消费的 offset+1 提交. 直接使用如下:
final int minBatchSize = 200;
 List<ConsumerRecord> buffer= new ArrayList<>() ;
 while ( isRunning.get() ) {
     ConsumerRecords<String , String> records = consumer . poll(1000) ;
     for (ConsumerRecord<String , String> record : records) {
     buffer.add(record);
     if (buffer.size() >= minBatchSize) {
         //do some logical processing with buffer .
         consumer.commitSync() ;
         buffer.clear();
    }
 }
  • 单条消息提交一次 该方式每消费一次,就保存一次。 虽然在很大程度上避免了重复消费, 但是其性能是极其低下的, 基本不在企业级考虑的范围, 并且也不是完全的能做到精准一次消费
while ( isRunning. get () ) {
    ConsumerRecords<String , String> records= consumer.poll(1000) ;
    for (ConsumerRecord<String , String> record : records) {
        //do some logical processing.
        //读取消费的消息的 offset
        long offset= record.offset() ;
        TopicPartition partition =new TopicPartition(record.topic() , record.partition()) ;
        // 提交位移
        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1))) ;
    }
}
 
  • 按分区提交 该方式其实是综合了 批量提交 和 单条消息提交一次, 按分区的小批次提交, 如果你要使用同步提交的方式, 那么建议你使用该方式
try {
  while (isRunning.get() ) {
      ConsumerRecords<String , String> records= consumer .poll(1000);
      for (TopicPartition partition : records.partitions( )) {
          //取出每个分区的消息
          List<ConsumerRecord<String, String> partitionRecords = records . records(partition)
          for (ConsumerRecord<String , String> record : partitionRecords) {
                  //消费该分区的消息*****
                  //*********
                 
                  //将该分区的 offset 提交
                  long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1) .offset() ;
                  consumer.commitSync(Collections.singletonMap ( partition , 
                  new OffsetAndMetadata(lastConsumedOffset + 1))
                  );
            }
        }
    }
}finally {
    consumer.close();
}
 

  • commitAsync() 异步提交

//三个重载方法
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets ,OffsetCommitCallback callback)
 

commitAsync属于异步提交, 也就是不会阻塞线程, 比起同步提交commitSync具有更好的性能。 这里我们主要来讨论下OffsetCommitCallback callback回调的使用, 理解起来很简单,我们每提交一次 Offset, callback 都会告诉我们是否提交成功。 那么如果我们提交失败了怎么办呢??

  • 一般的想法就是:失败了?那重新提交呗。 这种方式是否可行?我们看下面这个列子。 如果一个消费者消费到了 offset=10, 我们就异步提交了 offset=11, 继续拉取消息 offset=11-20, 这个时候 提交的 offset=11 还没有返回成功, 我们提交 offset=21, 返回 offset=21 提交成功。 OK,现在提交 offset=1的那条消息返回了, 并且是失败的, 那么如果你去重试, 提交 offset=11 就会覆盖掉 已经提交的 offset=21 很明显这不是我们想要的。
  • 正确的做法: 这个时候需要客户端维护一个序列号, 每次提交成功都 +1, 重试的时候进行对比, 不合法就不需要重试了。 当然实际情况, 一般提交offset不会失败, 并且就算失败一次也不会有问题, 因为后面每次消费一样会进行offset提交, 而对于消费者正常退出, 我们可以使用,commitSync同步提交, 保证offset的正确。

try {
    while(isRunning.get()) {
        //poll records and do some log 工cal processing .
        consumer . commitAsync() ;
    }
) finally {
    try {
        consumer.commitSync() ;
    ) finally {
        consumer.close() ;
}}
 
  • 再均衡导致的重复消费: 再均衡发生的时候也可能会导致消费者的offset来不及提交, 这时候我们需要在监听到再均衡发生的时候进行一次offset提交:
//该对象需要保存该消费者消费的分区的最新的 offset
//本段代码中没有体现,可以在消费数据之后 进行更新该对象
Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;

consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
    //发生在 再均衡 之前,并且消费者停止读取消息的时候
    @Override
    public void onPartitionsRevoked(Collection<TopicPart ition> partitions) {
        consume.commitSync(currentOffsets) ;
        currentOffsets.clear();
    }

    @Override
     public void onPartitions Assigned(Collection<TopicPartition > partitions) {
        //do nothing .
    }
} );
 


最后,我们来总结下:

  1. 一般来说,我们不会使用自动提交的方式管理 offset, 虽然简单,但是缺乏很好的控制, 不过如果能满足业务要求, 那么还是果断的使用起来吧
  2. 对于手动提交, 一般我们都是使用异步提交的方式, 在考虑准确的消费的情况下,兼顾的效率。
  3. 同步提交一般用来辅助异步提交, 对于一些特殊情况,保证offset的正确提交。
  4. 我们考虑到了再均衡的影响,并做了相关的处理
  5. 对于消费者异常退出 和 崩溃: 很遗憾的是如果出现异常和崩溃, 我们的消费还是很难做到精准的一次消费, 不过一般来说, 以上这些方法是绝对满足大部分企业大部分的业务的需求。 如果你实在要保证精准的一次消费, 你可能还需要一些其他的辅助, 比如:消费和提交 当做一次事务, 或者 重复消费是幂等 等等方式。 要精准一次消费, 还得依靠开发人员来自己保证, 当然,如果你使用 Kafka 的stream 方式消费, 是可以做到精准一次消费的, 不过这不在本文的讨论范围了...

最后,感谢你的阅读,如果可以,留个赞支持下作者!!!嘿嘿嘿~~~

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • SparkStreaming On Kafka —— Offset 管理

    我之前有写一篇kafka Consumer — offset的控制 如果你对于这方面的知识还不太清楚, 建议你去看一下, 毕竟理解了Kafka的消费者, ...

    solve
  • Kafka —— 如何保证消息不会丢失

    当我们通过 send(msg, callback) 是不是就意味着消息一定不丢失了呢?

    solve
  • Spark系列——从 cartesian 带你看点不一样的 Shuffle

    这只是一个人随意的一些分享, 你大概可以放宽心的当休闲的东西来看, 看完你大概也许会对Spark会有一些不一样的想法。

    solve
  • sqlserver 连接远程sqlserver数据库

    @useself = 'false', /*指定 rmtuser 和 rmtpassword 参数用来连接到特定 locallogin 的 rmtsrvname...

    静谧的小码农
  • 25个必须记住的SSH命令

    OpenSSH是SSH连接工具的免费版本。telnet,rlogin和ftp用户可能还没意识到他们在互联网上传输的密码是未加密的,但SSH是加密的,OpenSS...

    小小科
  • SSH 登录流程分析

    本文首发于 https://jaychen.cc 作者:jaychen(https://segmentfault.com/u/chenjiayao) 写一篇短文...

    程序员宝库
  • Spring cloud zuul的SendResponseFilter做了什么

    Spring cloud zull 的SendResponseFilter主要工作是将代理请求获取的reponse写入当前response,发送回客户端。以下是...

    java达人
  • Python学习笔记整理(十)Pytho

    if语句是选取要执行的操作. 一、if语句 1、通用格式 形式是if测试,后面跟着一个或多个可选的elif(else if)测试,以及一个最终选用的els...

    py3study
  • 5分钟彻底搞懂Flutter中PlatFormView与Texture

    想要在flutter想显示原生的东东,大家知道,一般有两种方式,一种是PlatformView,另外一种是Texture(俗称外接纹理)。其中PlatformV...

    brzhang
  • SSH免密码登录配置

    博主在搭建hdfs集群时,需要进行免密码登录,查找了很多方法,最后发现还是这种最直接和方便。代码下附。

    十里桃花舞丶

扫码关注云+社区

领取腾讯云代金券