让我们假设下面的RecordInterceptor只返回接收到的使用者记录的副本。
class CustomRecordInterceptor : RecordInterceptor<Any, Any> {
override fun intercept(record: ConsumerRecord<Any, Any>): ConsumerRecord<Any, Any>? {
return with(record) {
ConsumerRecord(
topic(),
partition(),
offset(),
timestamp(),
timestampType(),
checksum(),
serializedKeySize(),
serializedValueSize(),
key(),
value(),
headers(),
leaderEpoch())
}
}
}有了这样一个拦截器,我们经历了以下卡夫卡听众丢失的记录。
注意:record是拦截器返回的结果。
@KafkaListener(topics = ["topic"])
fun listenToEvents(
record: ConsumerRecord<SpecificRecordBase, SpecificRecordBase?>,
ack: Acknowledgment
) {
if (shouldNegativelyAcknowledge()) {
ack.nack(2_000L)
return
}
processRecord(record)
ack.acknowledge()
}每当shouldNegativelyAcknowledge()为真时,我们就会期望侦听器在超过2秒后重新处理该记录。我们正在使用ackMode = MANUAL。
然而,我们看到的是,经过一段时间后,监听器没有重新处理跳过的记录:从未为该记录调用过processRecord。过了一段时间,消费者群体出现了0的滞后。
在调试过程中,我们在KafkaMessageListenerContainer.ListenerConsumer#handleNack中找到了这个代码块
if (next.equals(record) || list.size() > 0) {
list.add(next);
}next是拦截器处理后的记录(所以它是原始记录的副本)。record是拦截器处理之前的记录请注意,next和record永远不能相等,因为ConsumerRecord不覆盖equals。
这可能是意外漏掉记录的原因,甚至可能是一个错误吗?
或者是错误地使用了记录拦截器来返回不同的ConsumerRecord对象,而不等于原始对象?
发布于 2022-04-05 17:02:45
这是一个错误,它解释了为什么其余的记录没有发送到侦听器--请在GitHub上打开一个问题
https://stackoverflow.com/questions/71755448
复制相似问题