我有一个Spring消费者的下面的配置。我在尝试手动确认而不是自动提交。在手动确认后,我开始出错。Spring版本为2.7.2。
kafka.consumer.groupId=mcs-ccp-event
message.topic.name=mcs_ccp_test
kafka.bootstrapAddress=kafka-dev-app-a1.com:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
和这种使用者配置
@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.consumer.groupId}")
private String groupId;
public ConsumerFactory<String, Event> eventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.xxx.mcsccpkafkaconsumer.vo.Event");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Event.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> eventKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(eventConsumerFactory());
return factory;
}
这是我的听众
@KafkaListener(topics = "${message.topic.name}", containerFactory = "eventKafkaListenerContainerFactory", groupId = "${kafka.consumer.groupId}")
public void eventListener(@Payload Event event, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment acknowledgment) {
log.info("Received event message: {} from partition : {}", event, partition);
persistEventToDB(event);
acknowledgment.acknowledge();
this.eventLatch.countDown();
}
每当使用者收到生产者的消息时,它总是抛出错误:
2022-08-06 11:16:11.749 ERROR 37700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff none exhausted for mcs__ccp-1@122
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.; nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2683) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2643) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2570) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2451) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2000) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) ~[spring-kafka-2.8.8.jar:2.8.8]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:369) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2663) ~[spring-kafka-2.8.8.jar:2.8.8]
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:369) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2663) ~[spring-kafka-2.8.8.jar:2.8.8]
... 11 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.xxx.mcsccpkafkaconsumer.vo.Event] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}], failedMessage=GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}]
... 15 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.xxx.mcsccpkafkaconsumer.vo.Event] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-5.3.22.jar:5.3.22]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.8.jar:2.8.8]
... 14 common frames omitted
发布于 2022-08-06 17:11:42
您正在设置属性文件中的ack模式和自动偏移复位,Spring Boot
的自动配置使用它来设置自己的KafkaListenerContainerFactory
。
但是从那时起,您就声明了自己的KafkaListenerContainerFactory
bean,自动配置就退出了,取而代之的是您的编程配置。
您可以直接在属性文件中为您的消费者工厂设置属性,并让Spring创建beans -这样就不需要这个KafkaConsumerConfig
类了。
或者,您可以在声明的工厂bean中直接设置ack模式和自动偏移复位,而不是属性文件。
https://stackoverflow.com/questions/73257562
复制相似问题