首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spring Boot Kafka Consumer未消费,Kafka Listener未触发

Spring Boot Kafka Consumer未消费,Kafka Listener未触发
EN

Stack Overflow用户
提问于 2020-01-07 02:14:24
回答 2查看 5K关注 0票数 1

我正在尝试构建一个简单的spring boot Kafka Consumer来消费来自kafka主题的消息,但是没有消息被消费,因为KafkaListener方法没有被触发。

我在其他答案中看到,以确保AUTO_OFFSET_RESET_CONFIG设置为“最早”,并且GROUP_ID_CONFIG是唯一的,这是我所做的,但是KafkaListenerMethod仍然没有触发。应用程序只是启动,不做任何事情:

代码语言:javascript
运行
复制
Application Started

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.4.RELEASE)

这是另一个gradle项目的子项目,该子项目的build.gradle如下所示(代码中正确地提供了MAIN_CLASS_PATH):

代码语言:javascript
运行
复制
apply plugin: 'application'

mainClassName = <MAIN_CLASS_PATH>

dependencies {
    compile "org.springframework.kafka:spring-kafka:${SpringKafkaVersion}"
    compile "org.springframework.boot:spring-boot-starter:${SpringBootVersion}"
    compile group: 'org.springframework', name: 'spring-tx', version: '5.2.2.RELEASE'
}

Java类:

Start.java:

代码语言:javascript
运行
复制
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Start {
    public static void main(String[] args) {
        try {
            System.out.println("Application Started");
            SpringApplication.run(Start.class, args);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

KafkaConsumerConfig.java:

代码语言:javascript
运行
复制
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.UUID;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                <KAFKA_SERVER_ADDRESS>);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                UUID.randomUUID().toString());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

代码语言:javascript
运行
复制
@Service
public class KafkaConsumer {

    @KafkaListener(topics = <TOPIC_NAME>)
    public void consume(@Payload ConsumerRecord<String, String> message) {
        System.out.println("Consumed message: " + message);
    }
}

在我的代码中已经正确地提供了KAFKA_SERVER_ADDRESS和TOPIC_NAME。此外,我还检查了该主题是否已包含消息。

有什么想法可以解释为什么这不会消耗任何来自kafka主题的消息?

EN

回答 2

Stack Overflow用户

发布于 2020-01-07 03:51:30

Spring boot和spring kafka具有自动配置功能。对于简单的使用者,删除您的配置类,并将此属性添加到您的application.yml (或.properties):

代码语言:javascript
运行
复制
 spring.application.name: your-app-name
 spring.kafka.consumer.group-id: your-group-id
 spring.kafka.bootstrap-servers: server1:port1,server2:port2,server3:port3
票数 1
EN

Stack Overflow用户

发布于 2020-01-07 20:04:13

这个问题是kafka的连接问题。从kafka服务器消费的机器不允许从kafka服务器消费。kafka服务器节点中的消费机WhiteListing解决了这个问题。

如果kafka服务器url可以解析,但由于权限不足而无法连接,则kafka消费者不会记录该url。这并不是spring kafka所特有的,但即使是普通的kafka客户端用户也不会记录这一点。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59616956

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档