刚开始学习卡夫卡。我试图建立一个小卡夫卡集群,包括2个经纪人。当两个经纪人都起床时,我成功地向我的主题发送了消息。当两个代理中的一个完成时,我想测试集群的行为。我停止了我的主代理( kafka1 )使用停靠停止kafka1,然后我尝试发送一条消息到我的集群,看看我的生产者是否能够理解,他需要发送到kafka2,因为kafka1是坏的。
然而,我经常收到以下错误:
{“级别”:“错误”,“时间戳”:“2022-07-19T18:59:46.891Z”、“记录器”:“kafkajs”、“message”:“连接响应元数据(键: 3,版本: 6)”、“代理”:“localhost:39092”、“clientId”:“my”、“错误”:“由于我们处于领导人选举的中间”、"correlationId":1、“大小”:
以下是我的制片代码:
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })
await producer.connect()
await producer.send({
topic: 'coverageEvolved',
messages: [
{ value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
],
})
await producer.disconnect()
下面是我的码头-撰写文件:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
restart: unless-stopped
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
volumes:
- ./zookeeper/data:/var/lib/zookeeper/data
kafka-1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
restart: unless-stopped
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-
1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
volumes:
- ./kafka1/data:/var/lib/kafka/data
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 39092:39092
restart: unless-stopped
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
volumes:
- ./kafka2/data:/var/lib/kafka/data
发布于 2022-07-20 13:18:25
如果您没有以其他方式创建您的主题,Kafka将默认只使用一个副本和一个分区来创建代码中使用的coverageEvolved
主题。
如果您杀死承载该副本的代理,将不会产生同步副本领导。
你可以使用Kafkajs创建主题。
另外值得一提的是,有一个事务主题只有一个副本(您缺少了一个环境变量)。这主要是与Java客户机相关的,因为在Kafka 3.0中,事务性生产者默认是启用的
https://stackoverflow.com/questions/73042460
复制相似问题