使用Spring batch实现Kafka。开发了Spring boot应用程序后,我的Kafka生成器不断地产生消息。我想分批处理这些消息。但当我触发作业时,作业仍在持续运行。所以我决定在KafkaItemReader中添加pollTimeout。这样我就可以停止我的工作。但是在触发Job时,Kafka中会有多少条消息。我无法在谷歌中找到,如果我将pollTimeout设置为1000ms,那么KafkaItemReader中会有多少条消息。
提示会很有帮助
@ String>().partitions(0).consumerproperties(prop).name(“reader”).savedata(true).topic(name).pollTimeout(Duration.ofMillis(1000).build()} KafkaItemReader item() { return new kafkaItemBuilder
发布于 2021-03-17 21:00:11
批处理是关于固定数据集的。如果你的主题是一个连续的事件流,那么Spring批处理作业对你来说不是一个好的选择,流解决方案更合适。Spring Batch期望您的ItemReader在数据源耗尽时返回null,但在您的示例中,数据源永远不会耗尽,这就是为什么您的作业永远不会结束。
如果在这段时间内没有收到任何消息,timeout属性实际上会让读取器返回null。
发布于 2021-03-16 21:31:20
该属性是超时,而不是记录限制。
您可以对max.poll.records和启动和停止消费者之间的时间段进行一些计算,但这只是一个估计值,而不是一个确切的数字,因为轮询超时只是等待最大轮询记录计数的上限
如果您希望以编程方式计算已处理消息的数量,我建议获取偏移量差值或对消耗的记录计数求和。
https://stackoverflow.com/questions/66655515
复制相似问题