首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >春季卡夫卡与卡夫卡集群

春季卡夫卡与卡夫卡集群
EN

Stack Overflow用户
提问于 2018-06-11 19:20:53
回答 1查看 5.2K关注 0票数 6

我已经在集群中配置了3个kafka,我正在尝试使用spring。

但在我杀了卡夫卡领导人之后,我无法发送其他信息来排队。

我正在将Spring.kafka.boot-server属性设置为:"kafka-1:9092;kafka-2:9093,kafka-3:9094“以及我的主机文件中的所有名称。

Kafka版0.10

有些人知道如何正确配置?

编辑

我已经测试了一件事,并发生了一种奇怪的行为。启动服务时,我会向主题发送一条消息(以强制创建)

代码:

代码语言:javascript
运行
复制
@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}

因此,在这段时间里,我没有启动kafka-1服务器(只是其他服务器),它发生了一个例外:

org.springframework.kafka.core.KafkaProducerException:未能发送;嵌套的异常是org.apache.kafka.common.errors.TimeoutException:未能在60000 ms后更新元数据。

看起来,spring只是尝试在第一个引导服务器上连接。我用的是spring 1.3.5 0.10.1.1和kafka 0.10.1.1

编辑2

我和你一样做了测试。当我移除第一个码头集装箱(卡夫卡-1)时,情况也是一样--领头人变了。因此,我的使用者(spring服务)无法使用这些消息。但是,当我再次启动kafka-1时,该服务将获取我的消费者ConcurrentKafkaListenerContainerFactory的所有消息:

代码语言:javascript
运行
复制
{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}
EN

回答 1

Stack Overflow用户

发布于 2018-06-11 20:51:36

您需要服务器地址之间的逗号,而不是分号。

编辑

我刚做了个没有问题的测试:

代码语言:javascript
运行
复制
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

代码语言:javascript
运行
复制
@SpringBootApplication
public class So50804678Application {

    public static void main(String[] args) {
        SpringApplication.run(So50804678Application.class, args);
    }

    @KafkaListener(id = "foo", topics = "so50804678")
    public void in(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50804678", 1, (short) 3);
    }

}

代码语言:javascript
运行
复制
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

杀了头目,

代码语言:javascript
运行
复制
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2

代码语言:javascript
运行
复制
$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678

发送了一条消息,应用程序收到了消息;除了警告之外,日志中没有任何错误:

未能建立到节点0的groupId=foo连接。可能找不到经纪人。

然后我重新启动死服务器;停止我的应用程序;然后添加以下代码.

代码语言:javascript
运行
复制
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
    return args -> {
        while(true) {
            System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
            Thread.sleep(3_000);
        }
    };
}

再一次,杀死现任领导人没有任何影响,一切都恢复正常。

您可能需要调整服务器道具中的监听器/adsed.listers属性。因为我的经纪人都在本地主机上,所以我让他们默认。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50804678

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档