本文主要讨论Kafka组件中的消费者和其消费进度。我们将通过一个使用Scala语言实现的原型系统来学习。本文假设你知道Kafka的基本术语。
在这个原型系统中,生产者持续不断地生成指定topic的消息记录,而消费者因为订阅了这个topic的消息记录持续地获取它们。在现实世界中,通常消费者和生产者的速度是不匹配的。因为消费者需要对消息记录进行处理,所以消费速度大多很慢。而本文的目标就是要找到消费者获取消息记录的速度到底落后了生产者多少。
可以通过计算消费者最后获取的和生产者最新生成的消息记录的进度的差值来找到消费者具体落后了多少。
首先,让我们创建一个Kafka消费者并设置其部分属性。
val properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleNewConsumer")
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.put("auto.offset.reset", "latest")
val consumer = new KafkaConsumer[String, String](properties)
下面是一些必须设置的消费者的属性。
我的原型系统刚刚使用上面提到的属性创建了消费者。
现在让我们为消费者订阅某个topic的消息。要订阅指定的topic的消息,您可以使用:
consumer.subscribe(util.Arrays.asList("topic-1"))
"topic-1"是需要订阅的topic的名称。
消费者可以通过设置一个topic列表来订阅多个topic。为了简单起见,本文只订阅了一个topic。
现在消费者已经订阅了该topic,从而可以处理该topic中的消息记录。消费者通过维护一个消费进度的变量来记录下一个需要访问的消息记录。
现在,让我们看看如何找到消费者的消费进度。
通过使用类ConsumerRecord的offset方法可以找到消费者的消费进度,该进度值指向Kafka分区中的特定的消息记录。与此同时,类ConsumerRecord的对象实例还是消费者处理消息记录的载体,并且该类还包含topic的名字、分区的编号以及生产者标记的生成时间戳(消息记录来源于生产者)。
同时,消费者可以使用consumer.poll(long类型)处理订阅的topic中的消息数据。
poll方法使用一个long类型的参数来指定超时时间 - 如果需要的消息数据不在缓冲区中,则等待指定的超时时间(以毫秒为单位)。
注意:如果没有订阅任何topic或者分区,则查询消息记录会返回错误。消费者在查询消息记录之前需要先订阅某个topic或者分区。
在每次查询中,消费者会尝试使用最近完成处理的消费进度作为初始值进行顺序查找。
当消费者从某个topic获取消息记录时,所有该topic的消息记录均以类ConsumerRecords的对象形式被访问...
val recordsFromConsumer = consumer.poll(10000)
....它是用来容纳特定topic的一个分区的ConsumerRecords列表的容器。我们可以使用类ConsumerRecords的records方法来获取特定topic的供消费者读取的ConsumerRecords列表。
val recordsFromConsumerList = recordsFromConsumer.records("topic-1").toList
或者你可以这样做:
val recordsFromConsumerList = recordsFromConsumer.asScala.toList
为此,您需要导入:
import scala.collection.JavaConverters._
为了获取消费者可以读取的最近的消费进度,我们可以使用ConsumerRecord类的offset方法从整个ConsumerRecords列表的最后一个ConsumerRecord来获取。
val lastOffset = recordsFromConsumerList.last.offset()
现在,该消费进度已经是此topic中最近的需要被访问的消息记录的位置了。
现在,我们可以使用KafkaConsumer对象中的endOffsets方法来定位该topic的最新消费进度,即该topic的最后一条消息记录的位置。因为endOffsets方法可以返回特定的分区的最后的消息记录,返回值类型是一个Map<TopicPartition, Long>。
分区的最新的消费进度同时也是即将生成的最新一条消息记录的位置,即最后一条已生成消息记录+1。
val partitionsAssigned = consumer.assignment()
val endOffsetsPartitionMap = consumer.endOffsets(partitionsAssigned)
endOffsets方法的参数是一个给定的需要被找出最新消费进度的分区的集合。
因为我想获取分区的最新消费进度,所以将消费者处理的分区的集合(consumer.assignment)作为参数传递给了endOffsets方法。
注意:只有消费者调用了poll方法之后才能调用assignment方法,否则assignment方法返回的结果将为空。endOffset方法不像seek方法,并不会改变消费者正在处理的消息的位置信息。
您可以使用下面的方法获取消费者当前的正在处理的位置信息:
val currentPosition = consumer.position(consumer.assignment().toList.head)
position方法的参数是一个特定的需要获取当前处理位置的分区。
既然我们已经获取了消费者正在处理的最新消息的位置和topic的特定分区的最新消息记录的位置,就很容易地能计算出消费者的落后进度。
val consumerLag = endOffsets.get(topicPartition.head) - lastReadOffset
最后,在我们此次的案例研究中,通过类ConsumerRecords,我们可以获取消费者的落后进度并且能让我们知道消费进度和其他有用的信息。
以上就是本文的所有内容,希望读者能获取有用的信息。你可以从我的GitHub仓库下载完整的代码。
如需了解关于Kafka及其API的更多信息,您可以访问官方网站,它可以非常清楚地解释所有疑问。
另外,如果您有任何问题,可以在下面发表评论。我会很乐意帮助你。
编码快乐!