我有一个用java spring引导(螺旋卡夫卡)写的kafka消费者。我的消费者如下所示。
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
autoCreateTopics = "false",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
include = {ResourceAccessException.class, MyCustomRetryableException.class})
@KafkaListener(topics = "myTopic", groupId = "myGroup", autoStartup = "true", concurrency = "3")
public void consume(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header("custom_header_1") String customHeader1,
@Header("custom_header_2") String customHeader2,
@Header("custom_header_3") String customHeader3,
@Header(required = false, name = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload(required = false) String message) {
log.info("-------------------------");
log.info(key);
log.info(message);
log.info("-------------------------");
}
我使用了@RetryableTopic
注释来处理错误。我已经编写了一个自定义异常类,以及任何抛出我的自定义异常类(MyCustomRetryableException.class
)的方法,它将根据退避在可还原注释中定义的尝试次数重试。所以在这里我不需要做任何事情。Kafka将简单地将失败消息发布到正确的dlt主题。我所要做的就是创建与dlt相关的主题,因为我使用了autoCreateTopics = "false"
。
现在,我正尝试按批处理方式使用消息。我改变了我的kafka配置,如下所示,以便批量消费。
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// default configs like bootstrap servers, key and value deserializers are here
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.DEBUG);
factory.setBatchListener(true);
return factory;
}
现在我已经添加了批处理侦听器,它不支持@RetryableTopic
。那么,如何将失败的消息发布到以前由@RetryableTopic
处理的DLT任务呢?
如果有人能用一个例子来回答,那就太好了。提前谢谢你。
发布于 2022-10-05 13:30:00
见文献资料。
使用DefaultErrorHandler
和DeadLetterPublishingRecoverer
。
不支持非阻塞重试;重试将使用配置的BackOff
。
抛出一个BatchListenerFailedException
,以指示批处理中的哪条记录失败,只有该记录将被发送到DLT。
除任何其他例外情况外,整个批处理将被重试(如果重试用尽,则发送到DLT )。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh
https://stackoverflow.com/questions/73959217
复制相似问题