如果Kafka和Zookeeper没有运行,为什么Maven Clean Install不会完整?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (10)

为了学习Apache Kafka,我开发了一个Spring Boot应用程序,Kafka如果我向调用KafkaTemplatesend方法的控制器发送POST请求,则会向主题发送消息。我正在运行Ubuntu 19.04并成功安装KafkaZookeeper在本地安装。一切正常。

问题发生在我关机要么ZookeeperKafka。如果我这样做,那么在启动Kafka AdminClient时我的应用程序会定期尝试查找一个broer,但会将此消息发送到控制台

Connection to node -1 could not be established. Broker may not be available.

我实现了Kafka + Zookeeper中建议的修复:无法建立到节点-1的连接。经纪人可能无法在这里使用Spring-Boot和Kafka:如何处理经纪人不可用?。但是,如果我运行一个,maven clean install那么构建永远不会完成,如果Zookeeper并且Kafka没有运行。为什么这样,有没有办法配置应用程序,以便它Kafka在启动时检查可用性,并在服务不可用时正常处理?

这是我的服务类调用 KafkaTemplate

@Autowired
public PingMessageServiceImpl(KafkaTemplate kafkaTemplate, KafkaTopicConfiguration kafkaTopicConfiguration) {
    this.kafkaTemplate = kafkaTemplate;
    this.kafkaTopicConfiguration = kafkaTopicConfiguration;
}

@Override
public void sendMessage(String message) {
    log.info(String.format("Received following ping message %s", message));

    if (!isValidPingRequest(message)) {
        log.warn("Received invalid ping request");
        throw new InvalidPingRequestException();
    }
    log.info(String.format("Sending message=[%s]", message));
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(kafkaTopicConfiguration.getPingTopic(), message);
    future.addCallback(buildListenableFutureCallback(message));
}

private boolean isValidPingRequest(String message) {
    return "ping".equalsIgnoreCase(message);
}

private ListenableFutureCallback<SendResult<String, String>> buildListenableFutureCallback(String message) {
    return new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info(String.format("Sent message=[%s] with offset=[%d]", message, result.getRecordMetadata().offset()));
        }
        @Override
        public void onFailure(Throwable ex) {
            log.info(String.format("Unable to send message=[%s] due to %s", message, ex.getMessage()));
        }
    };
}

以下是我用于从属性文件中提取Kafka的配置属性的配置类

@NotNull(message = "bootstrapAddress cannot be null")
@NotBlank(message = "bootstrapAddress cannot be blank")
private String bootstrapAddress;

@NotNull(message = "pingTopic cannot be null")
@NotBlank(message = "pingTopic cannot be blank")
private String pingTopic;

@NotNull(message = "reconnectBackoffMs cannot be null")
@NotBlank(message = "reconnectBackoffMs cannot be blank")
@Value("${kafka.reconnect.backoff.ms}")
private String reconnectBackoffMs;

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configurations = new HashMap<>();
    configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configurations.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
    return new KafkaAdmin(configurations);
}

@Bean
public NewTopic pingTopic() {
    return new NewTopic(pingTopic, 1, (short) 1);
}

@PostConstruct
private void displayOnStartup() {
    log.info(String.format("bootstrapAddress is %s", bootstrapAddress));
    log.info(String.format("reconnectBackoffMs is %s", reconnectBackoffMs));
}
提问于
用户回答回答于

如果您Spring-boot在加载ApplicationContextspring kafka bean 时有任何集成测试KafakTemplateKafkaAdmin则会尝试使用ymlproperties文件中指定的属性连接kafka服务器

因此,为了避免这种情况,您可以使用spring-embedded-kafka-server,以便kafka bean在测试执行期间连接到嵌入式服务器。

或者简单一点,你可以在集成测试用例中使用注释来模拟KafakTemplateKafkaAdminbean@MockBean

扫码关注云+社区

领取腾讯云代金券