首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka集群在动物园管理员重新启动后丢失消息。

Kafka集群在动物园管理员重新启动后丢失消息。
EN

Stack Overflow用户
提问于 2018-03-06 08:00:14
回答 2查看 1.5K关注 0票数 1

我正在使用Docker启动一个kafka代理集群(例如,每个容器有一个代理)。卡夫卡版本2.12-0.11.0.0,动物园管理员3.4.10。

场景:

  • 使用下面的配置启动第一个代理

zoo.cfg

代码语言:javascript
运行
复制
tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888

server.properties

代码语言:javascript
运行
复制
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker1_IP:broker1_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

producer.properties

代码语言:javascript
运行
复制
bootstrap.servers=localhost:9092
compression.type=none

consumer.properties

代码语言:javascript
运行
复制
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • 动物园管理员以独立模式启动,然后启动卡夫卡。
  • 创作主题

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-test-topic1

  • 发送讯息

echo "test_kafka1" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-test-topic1

  • 检查讯息

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

消息接收到

  • 描述主题

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

  • 开始休息4名经纪人

每个经纪人的zoo.cfg从1到5(只有0.0.0.0:2888:3888的位置不同)

代码语言:javascript
运行
复制
tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888
server.2=broker2_IP:broker2_2888:broker2_3888
server.3=broker3_IP:broker3_2888:broker3_3888
server.4=broker4_IP:broker4_2888:broker4_3888
server.5=broker5_IP:broker5_2888:broker5_3888

每个经纪人的server.properties从第一到第五(broker.id是唯一的,broker_IP:broker_PORT对技术经纪人不同)

代码语言:javascript
运行
复制
broker.id=N
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker_IP:broker_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

从第一到第五期间每个经纪人的producer.properties

代码语言:javascript
运行
复制
bootstrap.servers=localhost:9092
compression.type=none

从第一到第五期间每个经纪人的consumer.properties

代码语言:javascript
运行
复制
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • 重新启动每个代理的动物园管理员以使zoo.cfg生效
  • 动物园饲养员聚成一群
  • 主题移到broker 5

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 5 Replicas: 5 Isr: 5

这是正常行为吗?还是应该待在1号经纪人身上?

  • 检查每个代理的消息

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

消息丢失了(当主题停留在代理1上时,消息不会丢失,因此它是浮动的)

EN

回答 2

Stack Overflow用户

发布于 2018-05-01 16:24:17

你试过把发痒时间提高到6000次吗?根据Hadoop的设置,他们默认使用这个设置,说明2000毫秒设置太低。我认为这同样适用于这里。我现在正在处理一个非常类似的卡夫卡问题。

票数 0
EN

Stack Overflow用户

发布于 2019-03-22 11:09:37

在Kafka文档中,配置描述配置示例都建议在broker zookeeper.connect中指定所有的动物园管理员服务器。在生产过程中,预计您将运行一个单独的Zookeeper集群和一个单独的Kafka集群,而不是在一个码头容器中共同运行Kafka和ZK。

我想可能会发生这样的事情:

  • 由于您如何重新启动码头容器的一些细节,ZKs 2-5不知道Kafka 1已经在ZK 1中创建了一个znode,描述您的测试主题具有“副本: 1,ISR: 1",或者不同意使用ZK 1版本,因为没有仲裁。
  • 容器的某些子集2-5开始,其中3/5 ZKs在不等待ZK 1的情况下形成法定人数。
  • 某物(使用者或命令行工具或代理自动创建)尝试使用该主题,由于ZK仲裁同意它还不存在,所以创建它并将副本分配给当前可用的代理之一(在本例中为5)。
  • 容器1启动时,ZK 1必须放弃其主题znode的版本以支持仲裁,Kafka不得不放弃其副本以支持当前的描述。

我不知道什么是正确的方法,从一个单一节点的动物园管理员到一个复制的设置,并没有找到它在文档中。也许您必须首先为您的ZK分配更多的weight,这样您就可以保证它成为领导者,并在其他ZK节点上强制它的主题配置。

你制造了JIRA的问题吗?开发商有什么反应吗?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49125957

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档