我正在研究Spring实现,我的用例是将Kafka主题的消息作为批处理(使用批处理侦听器)使用。当我使用消息列表时,将迭代并调用REST端点以进行消息充实。如果REST对任何运行时异常都失败,我已经使用spring实现了重试逻辑。在重试失败之后,我想停止容器。因此,计划使用KafkaContainerStoppingErrorHandler来实现这一点。KafkaContainerStoppingErrorHandler是否提交了先前的成功消息--例如,如果我们接收到10条消息,而对于消息1、2、3、4,富集调用就是成功,而对于消息5富集API调用失败。因此,当我们重新启动容器时,我会再次得到所有的10条信息还是会接收到5-10条消息?
还是有一种方法可以实现上面的用例?我查看了Spring的所有类型的错误句柄,并需要输入如何实现上述需求。
发布于 2022-04-20 15:08:45
你会再一次得到他们的。
您可以使用DefaultErrorHandler (带有自定义恢复程序)并抛出一个BatchListenerFailedException来指示批处理中哪些记录失败。
错误处理程序将向该记录提交偏移设置,并使用失败的记录调用恢复程序;在您的自定义恢复程序中,您可以停止容器(使用与容器停止错误处理程序相同的逻辑)。
在2.8之前的版本中,RecoveringBatchErrorHandler提供了相同的功能。
https://stackoverflow.com/questions/71941052
复制相似问题