首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >春天-卡夫卡: DefaultErrorHandler与DeadLetterPublishingRecoverer(BiFunction)没有考虑。没有创建DL主题

春天-卡夫卡: DefaultErrorHandler与DeadLetterPublishingRecoverer(BiFunction)没有考虑。没有创建DL主题
EN

Stack Overflow用户
提问于 2022-07-28 00:38:07
回答 1查看 328关注 0票数 0

在使用spring-kafka的Spring应用程序中,我试图配置一个错误处理程序,其中包括两件事:-

  • 重试邮件消费失败一次(FixedBackOff),然后发布到死信主题
  • 创建一个带有“我的选择”

名称的死信主题

使用

代码语言:javascript
运行
复制
// Version highlights
id 'org.springframework.boot' version '2.7.2'
...
implementation 'org.springframework.kafka:spring-kafka' // 2.8.8

下面是基于我在Spring中阅读并在网上几篇文章中重申的内容使用的代码:

代码语言:javascript
运行
复制
@Bean
public DefaultErrorHandler byteArrayDefaultErrorHandler(KafkaTemplate<String, byte[]> template) {
  var recoverer =
    new DeadLetterPublishingRecoverer(
      template,
      (record, e) -> new TopicPartition("%s.deadLetter".formatted(record.topic()), 0);
    );

  return new DefaultErrorHandler(recoverer, new FixedBackOff(3000, 3));
}

但是上面的bean不被考虑/使用。因此,当消费遇到故障时(当前通过抛出异常来模拟失败),

  • FixedBackOff没有被考虑,但是使用了10次背靠背的默认尝试。创建了
  • No DL主题。

目前,使用者配置类的内容最少:

代码语言:javascript
运行
复制
@Bean public ConsumerFactory<String, byte[]> byteArrayConsumerFactory() { ... }

@Bean public ConcurrentKafkaListenerContainerFactory<String, byte[]> byteArrayListenerContainerFactory() { .. }

@Bean public DefaultErrorHandler byteArrayDefaultErrorHandler(KafkaTemplate<String, byte[]> template) { ...code pasted above... }

听者如下:

代码语言:javascript
运行
复制
@KafkaListener(
 topics = "${app.config.kafka.topic}",
 containerFactory = "byteArrayListenerContainerFactory"
)
public void consumeMessage(ConsumerRecord<String, byte[]> record) { ... }

我不知所措,弄不清楚我漏掉了什么,或者添加了什么与线路相冲突的东西。帮助弄清楚是非常感谢的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-28 13:52:48

只有在使用boot的自动配置的容器工厂时,才会通过引导连接错误处理程序bean。

既然你正在创建自己的容器工厂bean..。

代码语言:javascript
运行
复制
@Bean public ConcurrentKafkaListenerContainerFactory<String, byte[]> byteArrayListenerContainerFactory() { .. }

...you必须自己添加错误处理程序--参见setCommonErrorHandler()

框架不会自动提供死信主题;添加一个@Bean NewTopic dlt() { ... }

https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73145992

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档