我有一个java Kafka消费者,在其中我正在批量获取ConsumerRecords进行处理。system which can take random time for different requests or timeout in 5 seconds.我遇到的问题是,如果产生了较晚的记录,但前一条记录仍未超时,则如何提交或提交哪个偏移量。假设我在一批中获得了2条记录,第一条消息的外部调用仍在等待,而第二次调用已完成。如果我等待5秒等待外部响
我有一个正在运行的Kafka集群,当重新启动一个应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。当应用程序启动时,我可以看到它读取带有偏移量100的消息,然后将偏移量101推送到__consumer_offsets。然后,当应用程序关闭时,带有偏移量101, 102 and 103的消息将被推送到主题。在应用程序重新启动后,它读取101并将其偏移量设置为104,从而跳过102 and 103。
我有一个主题,我必须从kafka服务器读取,所以我只需要创建消费者,可以从kafka主题读取数据,我总是得到错误主题不存在。main.jsvar config = require('.The topic(s) [object Object] do not exist
at new TopicsNotExistError (C:\uilo