在使用spring-kafka
的Spring应用程序中,我试图配置一个错误处理程序,其中包括两件事:-
FixedBackOff
),然后发布到死信主题名称的死信主题
使用
// Version highlights
id 'org.springframework.boot' version '2.7.2'
...
implementation 'org.springframework.kafka:spring-kafka' // 2.8.8
下面是基于我在Spring中阅读并在网上几篇文章中重申的内容使用的代码:
@Bean
public DefaultErrorHandler byteArrayDefaultErrorHandler(KafkaTemplate<String, byte[]> template) {
var recoverer =
new DeadLetterPublishingRecoverer(
template,
(record, e) -> new TopicPartition("%s.deadLetter".formatted(record.topic()), 0);
);
return new DefaultErrorHandler(recoverer, new FixedBackOff(3000, 3));
}
但是上面的bean不被考虑/使用。因此,当消费遇到故障时(当前通过抛出异常来模拟失败),
FixedBackOff
没有被考虑,但是使用了10次背靠背的默认尝试。创建了目前,使用者配置类的内容最少:
@Bean public ConsumerFactory<String, byte[]> byteArrayConsumerFactory() { ... }
@Bean public ConcurrentKafkaListenerContainerFactory<String, byte[]> byteArrayListenerContainerFactory() { .. }
@Bean public DefaultErrorHandler byteArrayDefaultErrorHandler(KafkaTemplate<String, byte[]> template) { ...code pasted above... }
听者如下:
@KafkaListener(
topics = "${app.config.kafka.topic}",
containerFactory = "byteArrayListenerContainerFactory"
)
public void consumeMessage(ConsumerRecord<String, byte[]> record) { ... }
我不知所措,弄不清楚我漏掉了什么,或者添加了什么与线路相冲突的东西。帮助弄清楚是非常感谢的。
发布于 2022-07-28 13:52:48
只有在使用boot的自动配置的容器工厂时,才会通过引导连接错误处理程序bean。
既然你正在创建自己的容器工厂bean..。
@Bean public ConcurrentKafkaListenerContainerFactory<String, byte[]> byteArrayListenerContainerFactory() { .. }
...you必须自己添加错误处理程序--参见setCommonErrorHandler()
。
框架不会自动提供死信主题;添加一个@Bean NewTopic dlt() { ... }
。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics
https://stackoverflow.com/questions/73145992
复制相似问题