首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >弹簧靴卡夫卡手册失败

弹簧靴卡夫卡手册失败
EN

Stack Overflow用户
提问于 2022-08-06 06:11:00
回答 1查看 538关注 0票数 0

我有一个Spring消费者的下面的配置。我在尝试手动确认而不是自动提交。在手动确认后,我开始出错。Spring版本为2.7.2。

代码语言:javascript
运行
复制
kafka.consumer.groupId=mcs-ccp-event
message.topic.name=mcs_ccp_test
kafka.bootstrapAddress=kafka-dev-app-a1.com:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

和这种使用者配置

代码语言:javascript
运行
复制
@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
    
    @Value(value = "${kafka.consumer.groupId}")
    private String groupId;


    public ConsumerFactory<String, Event> eventConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.xxx.mcsccpkafkaconsumer.vo.Event");
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Event.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Event> eventKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(eventConsumerFactory());
        return factory;
    }

这是我的听众

代码语言:javascript
运行
复制
@KafkaListener(topics = "${message.topic.name}", containerFactory = "eventKafkaListenerContainerFactory", groupId = "${kafka.consumer.groupId}")
    public void eventListener(@Payload Event event, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            Acknowledgment acknowledgment) {
        log.info("Received event message: {} from partition : {}", event, partition);
        persistEventToDB(event);
        acknowledgment.acknowledge();
        this.eventLatch.countDown();
    }

每当使用者收到生产者的消息时,它总是抛出错误:

代码语言:javascript
运行
复制
2022-08-06 11:16:11.749 ERROR 37700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff none exhausted for mcs__ccp-1@122

org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.; nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2683) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2643) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2570) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2451) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2329) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2000) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1373) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1364) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) ~[spring-kafka-2.8.8.jar:2.8.8]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:369) ~[spring-kafka-2.8.8.jar:2.8.8]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.8.jar:2.8.8]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.8.jar:2.8.8]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2663) ~[spring-kafka-2.8.8.jar:2.8.8]
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:369) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2663) ~[spring-kafka-2.8.8.jar:2.8.8]
    ... 11 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.xxx.mcsccpkafkaconsumer.vo.Event] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}], failedMessage=GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}]
    ... 15 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.xxx.mcsccpkafkaconsumer.vo.Event] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=Event(eventType=Download, timestamp=2022-08-05 19:11:12, username=xxxxxx, browser=Chrome, eventDetails=EventDetails(objectName=VW_Attachment, recordType=ELA,EULA, agreementStatus=null, searchCategory=null, searchKeyword=null, downloadType=PDF, templateId=null, fileName=null, agreementNumber=null)), headers={kafka_offset=122, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c5b32a5, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=mcs_ccp_test, kafka_receivedTimestamp=1659764771420, kafka_groupId=mcs-ccp-event}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.22.jar:5.3.22]
    at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-5.3.22.jar:5.3.22]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.3.22.jar:5.3.22]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-5.3.22.jar:5.3.22]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.8.jar:2.8.8]
    ... 14 common frames omitted
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-06 17:11:42

您正在设置属性文件中的ack模式和自动偏移复位,Spring Boot的自动配置使用它来设置自己的KafkaListenerContainerFactory

但是从那时起,您就声明了自己的KafkaListenerContainerFactory bean,自动配置就退出了,取而代之的是您的编程配置。

您可以直接在属性文件中为您的消费者工厂设置属性,并让Spring创建beans -这样就不需要这个KafkaConsumerConfig类了。

或者,您可以在声明的工厂bean中直接设置ack模式和自动偏移复位,而不是属性文件。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73257562

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档