在这里,在消费和处理kafka消息时,我们正在做一些redis operations.Whenever,redis关闭或抛出一个错误/异常,我们需要持久化偏移,当redis出现时,我们需要消费持久化偏移量中的消息。
发布于 2021-09-15 15:01:10
添加一个KafkaBindingRebalanceListener bean。
应用程序可能希望在最初分配分区时寻找任意偏移量的主题/分区,或者在使用者上执行其他操作。从2.1版开始,如果您在应用程序上下文中提供单个
KafkaBindingRebalanceListenerbean,它将连接到所有Kafka使用者绑定中。
public interface KafkaBindingRebalanceListener {
...
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}在redis关闭时停止绑定,并在它恢复时重新启动它,并在Consumer上执行查找。
有关停止/启动绑定的信息,请参阅https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#binding_visualization_control。
https://stackoverflow.com/questions/69194244
复制相似问题