kafka主题offset各种需求修改方法

  简要:开发中,常常因为需要我们要认为修改消费者实例对kafka某个主题消费的偏移量。具体如何修改?为什么可行?其实很容易,有时候只要我们换一种方式思考,如果我自己实现kafka消费者,我该如何让我们的消费者代码如何控制对某一个主题消费,以及我们该如何实现不同消费者组可以消费同一个主题的同一条消息,一个消费组下不同消费者消费同一个主题的不同消息。如果让你实现该框架该如何实现?

  这里我演示实验storm的kafkaspout来进行消费,kafkaspout里面使用的低级api,所以他在zookeeper中存储数据的结构和我们使用kafka的java客户端的高级api在zookeeper中的存储结构是有所不同的。关于kafka的java客户端的高级api在zookeeper中的存储结构的构造可以看这篇文章:apache kafka系列之在zookeeper中存储结构

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6212913.html

可接网站开发,java开发。

新浪微博:intsmaze刘洋洋哥

  创建一个kafka主题名为intsmazX,指定分区数为3.

  使用kafkaspout创建该主题的消费者实例(指定元数据存放zookeeper中的路径为/kafka-offset,指定实例id为onetest),启动storm可以观察到如下信息:

INFO storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections
INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_0  --> null //这个地方会到zookeeper中该目录下读取,看是否存储有对该分区的消费信息
INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset//没有分区信息,这个时候就会直接到kafka的broker中得到该分区的最大偏移量
INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
INFO storm.kafka.PartitionManager - Starting Kafka hadoop002.icccuat.com:0 from offset 0
INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_1  --> null
INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
INFO storm.kafka.PartitionManager - Starting Kafka hadoop003.icccuat.com:1 from offset 0
INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_2  --> null
INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
INFO storm.kafka.PartitionManager - Starting Kafka hadoop001.icccuat.com:2 from offset 0
这个时候在zookeeper的/kafka-offset下没有生成名为onetest的目录,这是因为对应的intsmazeX还没有数据产生。
我们使用kafka消费者生产3条数据,然后去查看zookeeper中对应目录下的信息:
{"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":1,"broker":{"host":"hadoop003.icccuat.com","port":6667},"topic":"intsmazeX"}
{"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":2,"broker":{"host":"hadoop001.icccuat.com","port":6667},"topic":"intsmazeX"}
{"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}

30秒(kafkaspout中设置提交zookeeper消费偏移量时间为30秒)之后,可以看到,会记录该实例对每一个分区消费的偏移量为1.

杀掉该拓扑,这个时候我们再向intsmazeX主题生产6条数据,这个时候,broker中该主题每个分区的最大偏移量为3了。

然后我们修改/kafka-offset/onttest/下每一个分区的offset为3.

这个时候,我们再次部署该拓扑,可以发现拓扑没有消费刚刚产生的6条消息。再发送3条消息,拓扑就会立马消费这三条消息。

杀掉该拓扑,这个时候该拓扑消费者实例对每个分区的消费偏移量就是4了,然后我们把offset修改为6,然后启动拓扑,这个时候broker中该主题每个分区的最大偏移量为4并不是6,让我们看看,消费分区的偏移量大于主题分区当前偏移量会有什么样的情况出现。

WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2]
WARN storm.kafka.PartitionManager - Using new offset: 4
WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2]
WARN storm.kafka.PartitionManager - Using new offset: 4
WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2]
WARN storm.kafka.PartitionManager - Using new offset: 4
这个时候我们看到,消费者的分区偏移量的记录将会自动同步为每一个分区当前最大的偏移量了,kafkaspout会先用偏移量6去拉去,发现拉去不到,就到broker中获取该主题对应分区的最大偏移量。。
{"topology":{"id":"818ab9cc-d56f-454f-88b2-06dd830d54c1","name":"intsmaze-20161222-150006"},"offset":4,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}
....
把offset的偏移量设置为7000,一样在拓扑启动后,会更新为每个分区的最大偏移量。
    重新部署一个拓扑消费该主题,设置该拓扑的id为twotest,这个时候启动拓扑,我们发现,并没有启动拓扑前的消息数据,这是因为,拓扑启动后,要获得偏移量,而这个偏移量只能是当前主题每个分区的最大偏移量(因为分区的偏移量是递增,且
分区的数据会定时删除的,所以无法知道当前分区当前最开始的偏移量。)
Refreshing partition manager connections
 Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
 assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
 Deleted partition managers: []
 New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
 Read partition information from: /kafka-offset/twotest/partition_0  --> null
 No partition information found, using configuration to determine offset
 Starting Kafka hadoop002.icccuat.com:0 from offset 7
 Read partition information from: /kafka-offset/twotest/partition_1  --> null
 No partition information found, using configuration to determine offset
 Starting Kafka hadoop003.icccuat.com:1 from offset 7
 Read partition information from: /kafka-offset/twotest/partition_2  --> null
 No partition information found, using configuration to determine offset
 Starting Kafka hadoop001.icccuat.com:2 from offset 7
 Finished refreshing
 Refreshing partition manager connections
 Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
 assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
 Deleted partition managers: []
 New partition managers: []
 Finished refreshing
 Refreshing partition manager connections
 Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
 assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
 Deleted partition managers: []
 New partition managers: []
 Finished refreshing
 Refreshing partition manager connections
 Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
 assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
 Deleted partition managers: []
 New partition managers: []
 Finished refreshing
 Refreshing partition manager connections
 Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
 assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
 Deleted partition managers: []
 New partition managers: []
发送三条信息,查看该实例目录如下。
{"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"offset":8,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}
再启动一个拓扑,实例为twotest不变:
[INFO] Task [1/2] Refreshing partition manager connections
[INFO] Task [2/2] Refreshing partition manager connections
[INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
[INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
[INFO] Task [1/2] assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
[INFO] Task [2/2] assigned [Partition{host=hadoop003.icccuat.com:6667, partition=1}]
[INFO] Task [1/2] Deleted partition managers: []
[INFO] Task [2/2] Deleted partition managers: []
[INFO] Task [1/2] New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
[INFO] Task [2/2] New partition managers: [Partition{host=hadoop003.icccuat.com:6667, partition=1}]
[INFO] Read partition information from: /kafka-offset/twotest/partition_0  --> {"topic":"intsmazeX","partition":0,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop002.icccuat.com"},"offset":8}
[INFO] Read partition information from: /kafka-offset/twotest/partition_1  --> {"topic":"intsmazeX","partition":1,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop003.icccuat.com"},"offset":8}
[INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af
[INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af
[INFO] Starting Kafka hadoop002.icccuat.com:0 from offset 8
[INFO] Starting Kafka hadoop003.icccuat.com:1 from offset 8
[INFO] Task [2/2] Finished refreshing
[INFO] Read partition information from: /kafka-offset/twotest/partition_2  --> {"topic":"intsmazeX","partition":2,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop001.icccuat.com"},"offset":8}
[INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af
[INFO] Starting Kafka hadoop001.icccuat.com:2 from offset 8
[INFO] Task [1/2] Finished refreshing
[INFO] Task [2/2] Refreshing partition manager connections
[INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
[INFO] Task [2/2] assigned [Partition{host=hadoop003.icccuat.com:6667, partition=1}]
[INFO] Task [1/2] Refreshing partition manager connections
[INFO] Task [2/2] Deleted partition managers: []
[INFO] Task [2/2] New partition managers: []
{"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"offset":8,"partition":1,"broker":{"host":"hadoop003.icccuat.com","port":6667},"topic":"intsmazeX"}
然后发送消息,我们可以看到两个拓扑都会运行的,因为两个拓扑共用一个元数据信息。
这个过程有些坑要注意:
1:在使用kafka-spout的时候,我们要指定该kafka消费者在zookeeper中存储偏移量的地址,这里是/kafka-offset。同时指定该kafka对应的实例id这里是onetest.kafkapout和kafka客户端代码不一样,它没有消费组的概念,也不能这样说吧,只能说数据的存放不一样,不同的实例代表
不同的消费组。
2:修改某一个kafkaspout实例的时候,我们一定要把该id的拓扑关闭掉,我们在项目中遇到一个大坑,就是不熟一样的kafkaspout它的id是相同的,也就是共用同一个目录,那么如果我们没有下线这些拓扑任务,而只是把
这些拓扑任务设置为不活跃状态,那么我们修改zookeeper中偏移量后,再把拓扑设置为活跃状态后,会发现修改无效,offset还是变为以前的offset了,这是因为拓扑没有杀掉,它的运行程序中也会保存当前消费的偏移量,会定时更新的。
3:我们在杀拓扑时,要设置时间,因为拓扑默认30秒向zookeeper提交一下偏移量信息。
修改偏移量有两种,一种就是在部署拓扑前,先修改zookeeper中的偏移量,或者直接删除zookeeper中的对应实例的目录。这样从新部署都会从最新的偏移量开始运行。
  下面的是我当初自己学习kafka时,思考自己写kafka时,该如何解决kafka的消费者和消费组之间对数据消费时的判断。虽然框架极大简化了我们的生产力,但是作为一个有
思想的程序员,我们应该换一个角度去思考一个框架,而不应该再是这个框架有什么功能,我们用这个框架的这个功能,这样下去,我们就会一直认为这个框架好厉害,却不明白其内部实现方式。

如果自己要实现kafka功能:

第一,一个消费组创建后,这个消费组的创建是客户端完成的,它把消费组名会存到zookeeper中。

第二,消费者被创建以后,会把自己的名字存到zookeeper中所属消费组名的文件夹下面。

第三,消费者被创建了,我们当然要指定他可以消费主体的那一条消息,这个时候应该是kafka的broker进行控制了,它应该会不断监听zookeeper中所有消费组下的消费者的变得,当发现有消费者增加或删除就知道要进行重新分配,这个时候,它应该计算分配好一会在每一个消费者文件中写上他可以消费的分区号和该分区的偏移量。

第四,broker怎么知道每一个主题的分区情况,其实broker创建一条主题的时候指定了分区和副本数量,这个时候会在zookeeper中生成一个主题文件夹,文件夹下每一个文件代表一个分区,且每一个文件的内容就是这个分区的位置和副本位置等信息,关于该分区消费偏移量应该不会记录在里面,因为每一个消费组中消费者消费该分区偏移量是不同的。

第五,这个时候我可以猜想到,应该是消费者文件中记录着它已经消费的偏移量,当消费者对消费分区进行重新分配时,偏移量也要进行转移,不然重新分配后,又要消费之前已经消费过的数据。但是这也有问题:因为消费者被删除它消费的偏移量就删掉了,它之前消费的分区分给其他人,其他人也不知道从哪里开始消费。

看kafkazookeeper存储结构我们可以发现:

消费者(群)文件夹,这个文件夹夹下面是各个消费组文件夹,每一个文件夹代表一个消费组信息。

消费组文件夹下面有三个文件夹,一个是存储该消费组中的每一个消费者,每一消费者就是一个文件,另一个文件夹存储的这个消费组可以消费的主题的文件夹,每一个文件夹代表他可以消费哪些主题。每一个主题文件夹下面就是该主题的分区,每一个分区文件就记录被该消费组消费的偏移量。

这样就可以保证,当消费者增加或删除后,它所消费分区的偏移量还在,我们进行重新分配时,可以保证分配好的分区,消费者不会重新消费,而直到该分区被消费的位置。

但是我们怎么知道哪一个消费者消费哪一个分区,把分区好存储到消费者文件中,这样貌似也可以,因为消费者删除后,它消费的分区会丢失也没有关系,broker监听消费者数量变化,一变化就对他们进行重新分配。(我现在能想到的好处就是,如果现有系统中存在消费者没有消费数据,那么我们删掉该消费者,但是我们只是监听到了消费者变化,并不知道是否有分区随着消费者的删掉而被停止消费,仍然会进行重新消费,其实这种情况是没有必要的),那么我们换一个方法吧。上面的猜想错了,一个消费组中的消费者只能消费一个主题的一条消息,其实就是一个主题的分区只能对应一个消费组中的一个消费者,换过来想,一个消费组可以消费多条主题,应该是可以的,那么一个消费组中的消费者就可以消费多条主题的中的一个分区。

或者是一个消费组可以消费多个主题,还是是一个消费者只能消费一个主题的一个分区。

经过我测试发现,一个消费者消费多个主题是可以实现的。

一个消费者消费多条主题的一个分区如何实现?

还有最后一个文件,该文件下面也是多个主题的文件夹,每个文件夹下面就是该文件的一个一个分区,分区我应该让他记录消费它的消费者的名称。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏李家的小酒馆

Spring面试题

什么是Spring Spring是一个轻量级的容器,他实现了IOC和非侵入的框架,并提供了AOP的实现方式,提供了持久层事务的支持,其让java开发模块化,并且...

4700
来自专栏web编程技术分享

【Java框架型项目从入门到装逼】第七节 - 学生管理系统项目搭建

2937
来自专栏王清培的专栏

spring rest 容易被忽视的后端服务 chunked 性能问题

spring boot 容易被忽视的后端服务 chunked 性能问题 标签(空格分隔): springboot springmvc chunked 背景 sp...

5068
来自专栏运维

Redis3.0.7集群部署完整版

Redis集群没有出来前,一直使用Codis集群,现在部署Redis集群看看效果如何。

2492
来自专栏web编程技术分享

三分钟学会用SpringMVC搭建最小系统(超详细)

4708
来自专栏nummy

flume使用kafka作为sink

1071
来自专栏LanceToBigData

细说log4j

可能做过java项目的基本上都是用过log4j,它是用来做java日志的。比如我们做一个项目分为很多的模块,那我们怎么想要知道它什么时候启动了,这时候我们可以使...

2845
来自专栏架构师之旅

Spring框架知识总结-注入Bean的各类异常

近日整合sping和hibernate框架时遇到了一系列的异常,本次主要说明一下spring框架可能出现的异常及解决方案。 我们借助sping强...

2088
来自专栏猿天地

Spring Boot 使用WebAsyncTask异步返回结果

在Spring Boot中(Spring MVC)下请求默认都是同步的,一个请求过去到结束都是由一个线程负责的,很多时候为了能够提高吞吐量,需要将一些操作异步化...

4472
来自专栏大魏分享(微信公众号:david-share)

重点来了:事务一致性的深入研究&EJB的全生命周期 | 从开发角度看应用架构5

1354

扫码关注云+社区

领取腾讯云代金券