我想用ack模式MANUAL_IMMDEDIATE实现手动提交。我使用AcknowledgingMessageListener类作为消费者。它在应用程序运行时很好,但是在用户接收数据之后,应用程序就会出现错误,但是应用程序仍然在运行。只有在收到数据后才会发生错误。任何建议都将不胜感激,谢谢
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Component
public class MyCustomMessageListener extends CustomMessageListener {
@Override
@SneakyThrows
public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
createDefaultMethodKafkaListenerEndpoint(name, topic);
kafkaListenerEndpoint.setBean(new MyMessageListener());
kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage", ConsumerRecord.class));
return kafkaListenerEndpoint;
}
@Slf4j
private static class MyMessageListener implements AcknowledgingMessageListener<String, String> {
/**
* Invoked with data from kafka.
* @param acknowledgment ack
* @param data the data to be processed.
*/
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
log.info("My message listener got a new record: " + data);
acknowledgment.acknowledge();
log.info("My message listener done processing record: " + data);
}
}
}
应用日志
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:28.908 INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:28.909 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:29.450 INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:29.452 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:30.039 INFO 32664 --- [antopicPE-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-cleantopicPE-1, groupId=cleantopicPE] Seeking to offset 61 for partition consumerpe-0
2022-06-07 11:15:30.041 ERROR 32664 --- [antopicPE-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:133) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
2022-06-07 11:15:30.710 ERROR 32664 --- [antopicPE-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for consumerpe-0@61
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public default void org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V>)' threw exception; nested exception is java.lang.UnsupportedOperationException: Container should never call this; nested exception is java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar:2.8.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.UnsupportedOperationException: Container should never call this
at org.springframework.kafka.listener.AcknowledgingMessageListener.onMessage(AcknowledgingMessageListener.java:44) ~[spring-kafka-2.8.6.jar:2.8.6]
at jdk.internal.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.13.jar:5.3.13]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar:2.8.6]
... 11 common frames omitted
发布于 2022-06-07 13:11:10
MethodKafkaListenerEndpoint
不打算以这种方式使用;它适用于@KafkaListener
方法。
根本不要使用端点,只需创建侦听器容器并将侦听器添加到其容器属性。
发布于 2022-06-07 05:20:22
如文档所述,不要将onMessage与ConsumerRecord一起使用,因为此消息侦听器正在确认,因此需要确认
请重写这一行
kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage", ConsumerRecord.class))
论方法
public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic)
https://stackoverflow.com/questions/72526075
复制相似问题