首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Spring Reactive kafka接收器总是覆盖引导服务器到本地主机

Spring Reactive kafka接收器总是覆盖引导服务器到本地主机
EN

Stack Overflow用户
提问于 2018-06-08 03:21:24
回答 1查看 1.6K关注 0票数 3

我正在尝试使用@EnableKafka和@KafkaListener注释为反应式Kafka消费者编写Spring引导应用程序。我已经配置了我的kafka代理在不同的机器上。当我将bootstrap-server提供给播发的kafka代理主机时,它总是覆盖播发给localhost的主机ip地址。下面是我的代码。

pom.xml文件:-

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.0.0.RELEASE</version>
</dependency>

配置:-

代码语言:javascript
复制
@Configuration
@EnableKafka
public class AppConfig {

    @Bean
    Map consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.12.12.24:9092,192.14.14.28:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    ReceiverOptions receiverOptions() {

        ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps()).subscription(Arrays.asList("hellochange"));

        return receiverOptions;
    }

    @Bean
    public KafkaReceiver kafkaReceiver() {
        return KafkaReceiver.create(receiverOptions());
    }

}

消费者:-

代码语言:javascript
复制
@Service
public class ChangeListener {

    @Autowired
    KafkaReceiver kafkaReceiver;

    @KafkaListener(topics="hellochange",groupId="example-group")
    public void receiver() {
        kafkaReceiver.receive().subscribe(System.out::println);
    }

}

控制台:-

代码语言:javascript
复制
 auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
2018-06-07 19:59:17.640  WARN 23536 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=example-group] Connection to node -1 could not be established. Broker may not be available.

我已经在simple consumer和non-reactive spring kafka中验证了没有弹簧配置,对于这两种情况,它都工作得很好。只有带有EnableKafka和KafkaListener注释的Reactor kafka我才会遇到这个问题。

我是不是漏掉了什么/做错了什么?我们可以在Spring boot中将EnableKafka和KafkaListener注解与Reactor Kafka一起使用吗?

附言:我理解,@EnableKafka@KafkaListener不是反应性的,如果我从pom.xml中删除spring-kafka,两个注解都不可用。

像用于非反应式kafka的@EnableKafka@KafkaListener一样,是否有任何注释可用于使用Spring boot应用程序配置反应式kafka消费者?

EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50748770

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档