浅析Kafka的消费者和消费进度的案例研究

本文主要讨论Kafka组件中的消费者和其消费进度。我们将通过一个使用Scala语言实现的原型系统来学习。本文假设你知道Kafka的基本术语。

在这个原型系统中,生产者持续不断地生成指定topic的消息记录,而消费者因为订阅了这个topic的消息记录持续地获取它们。在现实世界中,通常消费者和生产者的速度是不匹配的。因为消费者需要对消息记录进行处理,所以消费速度大多很慢。而本文的目标就是要找到消费者获取消息记录的速度到底落后了生产者多少。

可以通过计算消费者最后获取的和生产者最新生成的消息记录的进度的差值来找到消费者具体落后了多少。

首先,让我们创建一个Kafka消费者并设置其部分属性。

val properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleNewConsumer")
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.put("auto.offset.reset", "latest")
val consumer = new KafkaConsumer[String, String](properties)

下面是一些必须设置的消费者的属性。

  • 根据Kafka文档中的规定,Bootstrap_Servers是“用于建立到Kafka集群的初始连接的主机/端口对列表”。Kafka服务器的端口缺省从9092开始。
  • Group_Id是消费者所属的组的ID。
  • Key.deserializerValue.deserializer指定如何反序列化记录的键(key)和值(value)。比如当生产者使用字符串序列化器编码记录时,消费者必须使用字符串反序列化器解码记录。注意:您可以从我的GitHub库中查看我的Kafka 生产者的代码。因为本文主要讨论消费者,所以没有展示任何生产者的代码。
  • Auto.offset.reset用于指定消费者获取消费记录的起点是从最开始(最早)还是最近的提交开始。

我的原型系统刚刚使用上面提到的属性创建了消费者。

现在让我们为消费者订阅某个topic的消息。要订阅指定的topic的消息,您可以使用:

consumer.subscribe(util.Arrays.asList("topic-1"))

"topic-1"是需要订阅的topic的名称。

消费者可以通过设置一个topic列表来订阅多个topic。为了简单起见,本文只订阅了一个topic。

现在消费者已经订阅了该topic,从而可以处理该topic中的消息记录。消费者通过维护一个消费进度的变量来记录下一个需要访问的消息记录。

现在,让我们看看如何找到消费者的消费进度。

通过使用类ConsumerRecord的offset方法可以找到消费者的消费进度,该进度值指向Kafka分区中的特定的消息记录。与此同时,类ConsumerRecord的对象实例还是消费者处理消息记录的载体,并且该类还包含topic的名字、分区的编号以及生产者标记的生成时间戳(消息记录来源于生产者)。

同时,消费者可以使用consumer.poll(long类型)处理订阅的topic中的消息数据。

poll方法使用一个long类型的参数来指定超时时间 - 如果需要的消息数据不在缓冲区中,则等待指定的超时时间(以毫秒为单位)。

注意:如果没有订阅任何topic或者分区,则查询消息记录会返回错误。消费者在查询消息记录之前需要先订阅某个topic或者分区。

在每次查询中,消费者会尝试使用最近完成处理的消费进度作为初始值进行顺序查找。

当消费者从某个topic获取消息记录时,所有该topic的消息记录均以类ConsumerRecords的对象形式被访问...

val recordsFromConsumer = consumer.poll(10000)

....它是用来容纳特定topic的一个分区的ConsumerRecords列表的容器。我们可以使用类ConsumerRecords的records方法来获取特定topic的供消费者读取的ConsumerRecords列表。

val recordsFromConsumerList = recordsFromConsumer.records("topic-1").toList

或者你可以这样做:

val recordsFromConsumerList = recordsFromConsumer.asScala.toList

为此,您需要导入:

import scala.collection.JavaConverters._

为了获取消费者可以读取的最近的消费进度,我们可以使用ConsumerRecord类的offset方法从整个ConsumerRecords列表的最后一个ConsumerRecord来获取。

val lastOffset = recordsFromConsumerList.last.offset()

现在,该消费进度已经是此topic中最近的需要被访问的消息记录的位置了。

现在,我们可以使用KafkaConsumer对象中的endOffsets方法来定位该topic的最新消费进度,即该topic的最后一条消息记录的位置。因为endOffsets方法可以返回特定的分区的最后的消息记录,返回值类型是一个Map<TopicPartition, Long>。

分区的最新的消费进度同时也是即将生成的最新一条消息记录的位置,即最后一条已生成消息记录+1。

val partitionsAssigned = consumer.assignment()
val endOffsetsPartitionMap = consumer.endOffsets(partitionsAssigned)

endOffsets方法的参数是一个给定的需要被找出最新消费进度的分区的集合。

因为我想获取分区的最新消费进度,所以将消费者处理的分区的集合(consumer.assignment)作为参数传递给了endOffsets方法。

注意:只有消费者调用了poll方法之后才能调用assignment方法,否则assignment方法返回的结果将为空。endOffset方法不像seek方法,并不会改变消费者正在处理的消息的位置信息。

您可以使用下面的方法获取消费者当前的正在处理的位置信息:

val currentPosition = consumer.position(consumer.assignment().toList.head)

position方法的参数是一个特定的需要获取当前处理位置的分区。

既然我们已经获取了消费者正在处理的最新消息的位置和topic的特定分区的最新消息记录的位置,就很容易地能计算出消费者的落后进度。

val consumerLag = endOffsets.get(topicPartition.head) - lastReadOffset

最后,在我们此次的案例研究中,通过类ConsumerRecords,我们可以获取消费者的落后进度并且能让我们知道消费进度和其他有用的信息。

以上就是本文的所有内容,希望读者能获取有用的信息。你可以从我的GitHub仓库下载完整的代码

如需了解关于Kafka及其API的更多信息,您可以访问官方网站,它可以非常清楚地解释所有疑问。

另外,如果您有任何问题,可以在下面发表评论。我会很乐意帮助你。

编码快乐!

本文的版权归 you 所有,如需转载请联系作者。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏玄魂工作室

Hacker基础之Linux篇:基础Linux命令一

这一部分主要是讲解Linux常用命令工具,比如文件管理,文本处理等等,主要是为了让读者用最少的实践掌握和熟练应用基本的Linux操作,对于每个命令的举例,尽量做...

2977
来自专栏惨绿少年

setup 命令中防火墙配置选项无法打开

故障提示 ? 1.1 如何让setup 命令中防火墙配置生效 1.1.1 CentOS系统找不到setup命令工具的解决方法 yum -y install se...

3450
来自专栏Python攻城狮

Linux命令-文件管理 1.目录方面命令:ls

Linux文件或者目录名称最长可以有265个字符,“.”代表当前目录,“..”代表上一级目录,以“.”开头的文件为隐藏文件,需要用-a参数才能显示。

1052
来自专栏玄魂工作室

Hacker基础之Linux篇:基础Linux命令四

我们继续学习Linux 1. egrep egrep命令用于在文件内查找指定的字符串。 egrep执行效果与grep-E相似,使用的语法及参数可参照grep指...

30810
来自专栏技术墨客

ESC服务搭建CheckList 原

注意:ESC有安全策略组,修改端口时需要在ESC的管理页面上检查相关的端口是否开放。

974
来自专栏一“技”之长

Git命令集十三——快照操作 原

    Git工具中提供了一个stash命令,这个命令的作用是创建快照。快照主要的用途是将当前未更新到缓存区的修改进行入栈保存,创建快照后,Git的状态会变回上...

942
来自专栏Java面试笔试题

举例说明同步和异步

如果系统中存在临界资源(资源数量少于竞争资源的线程数量的资源),例如正在写的数据以后可能被另一个线程读到,或者正在读的数据可能已经被另一个线程写过了,那么这些数...

1234
来自专栏架构师之旅

Dubbo(Dubbo与Zookeeper、SpringMVC整合)

Zookeeper作为Dubbo服务的注册中心,Dubbo原先基于数据库的注册中心,没采用Zookeeper,Zookeeper一个分布式的服务框架,是树型的...

2K3
来自专栏坚毅的PHP

使用 Java Service Wrapper 启动java后台进程服务

 Java Service Wrapper (http://wrapper.tanukisoftware.com/doc/english/product-ove...

4915
来自专栏开源优测

AutoLine源码分析之Flask初始化模块

下面我们看下在manage.py中如何调用create_app来初始化全局flask运行环境的。

1553

扫码关注云+社区

领取腾讯云代金券