首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >作为批处理在kafka中使用时出错处理

作为批处理在kafka中使用时出错处理
EN

Stack Overflow用户
提问于 2022-10-05 10:40:51
回答 1查看 133关注 0票数 0

我有一个用java spring引导(螺旋卡夫卡)写的kafka消费者。我的消费者如下所示。

代码语言:javascript
运行
复制
@RetryableTopic(
          attempts = "4",
          backoff = @Backoff(delay = 1000, multiplier = 2.0),
          autoCreateTopics = "false",
          topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
          include = {ResourceAccessException.class, MyCustomRetryableException.class})
@KafkaListener(topics = "myTopic", groupId = "myGroup", autoStartup = "true", concurrency = "3")
  public void consume(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                      @Header("custom_header_1") String customHeader1,
                      @Header("custom_header_2") String customHeader2,
                      @Header("custom_header_3") String customHeader3,
                      @Header(required = false, name = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                      @Payload(required = false) String message) {

    log.info("-------------------------");
    log.info(key);
    log.info(message);
    log.info("-------------------------");

  }

我使用了@RetryableTopic注释来处理错误。我已经编写了一个自定义异常类,以及任何抛出我的自定义异常类(MyCustomRetryableException.class)的方法,它将根据退避在可还原注释中定义的尝试次数重试。所以在这里我不需要做任何事情。Kafka将简单地将失败消息发布到正确的dlt主题。我所要做的就是创建与dlt相关的主题,因为我使用了autoCreateTopics = "false"

现在,我正尝试按批处理方式使用消息。我改变了我的kafka配置,如下所示,以便批量消费。

代码语言:javascript
运行
复制
@Bean
  public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
 
    // default configs like bootstrap servers, key and value deserializers are here

    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
    return new DefaultKafkaConsumerFactory<>(config);
  }


  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.DEBUG);
    factory.setBatchListener(true);
    return factory;
  }

现在我已经添加了批处理侦听器,它不支持@RetryableTopic。那么,如何将失败的消息发布到以前由@RetryableTopic处理的DLT任务呢?

如果有人能用一个例子来回答,那就太好了。提前谢谢你。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-05 13:30:00

文献资料

使用DefaultErrorHandlerDeadLetterPublishingRecoverer

不支持非阻塞重试;重试将使用配置的BackOff

抛出一个BatchListenerFailedException,以指示批处理中的哪条记录失败,只有该记录将被发送到DLT。

除任何其他例外情况外,整个批处理将被重试(如果重试用尽,则发送到DLT )。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73959217

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档