我在Kafka中设置了一个主题的时间间隔为7天,我正在从Kafka获取数据并将其存储在数据库中,但从过去5天我的数据库服务器宕机了,现在我必须从Kafka获取最近5天的消息并将其存储在数据库中。注:从过去5天开始,Kafka没有问题。
发布于 2017-09-05 14:01:53
首先调用consumer.partitionsFor()方法来获取主题的分区
然后调用consumer.offsetsForTimes(),获取5天前最后一条消息成功处理时每个分区的时间戳的偏移量。
然后调用consumer.seek()将当前的消费者偏移量定位在该时间点,并继续调用poll()并照常处理消息。
发布于 2017-09-05 14:05:05
在前面的很好的回答中,我将添加call partitionsFor方法来获取您的主题的分区,然后按照@Hans所说的做。
https://stackoverflow.com/questions/46047649
复制相似问题