首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >@KafkaListener在用户记录更改后使用Acknowledgment.nack()跳过消息

@KafkaListener在用户记录更改后使用Acknowledgment.nack()跳过消息
EN

Stack Overflow用户
提问于 2022-04-05 16:39:23
回答 1查看 425关注 0票数 0

让我们假设下面的RecordInterceptor只返回接收到的使用者记录的副本。

代码语言:javascript
复制
class CustomRecordInterceptor : RecordInterceptor<Any, Any> {

  override fun intercept(record: ConsumerRecord<Any, Any>): ConsumerRecord<Any, Any>? {
    return with(record) {
      ConsumerRecord(
          topic(),
          partition(),
          offset(),
          timestamp(),
          timestampType(),
          checksum(),
          serializedKeySize(),
          serializedValueSize(),
          key(),
          value(),
          headers(),
          leaderEpoch())
    }
  }
}

有了这样一个拦截器,我们经历了以下卡夫卡听众丢失的记录。

注意:record是拦截器返回的结果。

代码语言:javascript
复制
@KafkaListener(topics = ["topic"])
fun listenToEvents(
    record: ConsumerRecord<SpecificRecordBase, SpecificRecordBase?>,
    ack: Acknowledgment
) {
  if (shouldNegativelyAcknowledge()) {
    ack.nack(2_000L)
    return
  }
  processRecord(record)
  ack.acknowledge()
}

每当shouldNegativelyAcknowledge()为真时,我们就会期望侦听器在超过2秒后重新处理该记录。我们正在使用ackMode = MANUAL

然而,我们看到的是,经过一段时间后,监听器没有重新处理跳过的记录:从未为该记录调用过processRecord。过了一段时间,消费者群体出现了0的滞后。

在调试过程中,我们在KafkaMessageListenerContainer.ListenerConsumer#handleNack中找到了这个代码块

代码语言:javascript
复制
if (next.equals(record) || list.size() > 0) {
  list.add(next);
}
  • next是拦截器处理后的记录(所以它是原始记录的副本)。
  • record是拦截器处理之前的记录

请注意,nextrecord永远不能相等,因为ConsumerRecord不覆盖equals

这可能是意外漏掉记录的原因,甚至可能是一个错误吗?

或者是错误地使用了记录拦截器来返回不同的ConsumerRecord对象,而不等于原始对象?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-04-05 17:02:45

这是一个错误,它解释了为什么其余的记录没有发送到侦听器--请在GitHub上打开一个问题

https://github.com/spring-projects/spring-kafka/issues

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71755448

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档