首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >卡夫卡在这个话题上没有领袖,因为我们正在进行领导人选举。

卡夫卡在这个话题上没有领袖,因为我们正在进行领导人选举。
EN

Stack Overflow用户
提问于 2022-07-19 19:23:03
回答 1查看 1.9K关注 0票数 2

刚开始学习卡夫卡。我试图建立一个小卡夫卡集群,包括2个经纪人。当两个经纪人都起床时,我成功地向我的主题发送了消息。当两个代理中的一个完成时,我想测试集群的行为。我停止了我的主代理( kafka1 )使用停靠停止kafka1,然后我尝试发送一条消息到我的集群,看看我的生产者是否能够理解,他需要发送到kafka2,因为kafka1是坏的。

然而,我经常收到以下错误:

{“级别”:“错误”,“时间戳”:“2022-07-19T18:59:46.891Z”、“记录器”:“kafkajs”、“message”:“连接响应元数据(键: 3,版本: 6)”、“代理”:“localhost:39092”、“clientId”:“my”、“错误”:“由于我们处于领导人选举的中间”、"correlationId":1、“大小”:

以下是我的制片代码:

代码语言:javascript
运行
复制
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })

await producer.connect()

await producer.send({
  topic: 'coverageEvolved',
  messages: [
    { value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
  ],
})

await producer.disconnect()

下面是我的码头-撰写文件:

代码语言:javascript
运行
复制
version: '2'
services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    volumes:
      - ./zookeeper/data:/var/lib/zookeeper/data
    kafka-1:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
            - 29092:29092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka- 
    1:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka1/data:/var/lib/kafka/data
      kafka-2:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 39092:39092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka2/data:/var/lib/kafka/data
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-20 13:18:25

如果您没有以其他方式创建您的主题,Kafka将默认只使用一个副本和一个分区来创建代码中使用的coverageEvolved主题。

如果您杀死承载该副本的代理,将不会产生同步副本领导。

你可以使用Kafkajs创建主题

另外值得一提的是,有一个事务主题只有一个副本(您缺少了一个环境变量)。这主要是与Java客户机相关的,因为在Kafka 3.0中,事务性生产者默认是启用的

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73042460

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档