首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >kafka在线添加分区或broker安全吗?

kafka在线添加分区或broker安全吗?
EN

Stack Overflow用户
提问于 2016-03-20 21:20:26
回答 2查看 2.2K关注 0票数 4

谢谢你的耐心。

  1. 在线为主题添加分区后,kafka消费者停止读取消息,不会抛出异常。消费者只是阻塞。每次我们都必须重启消费者。我认为这是不合理的,我找不到任何关于它的文档。

此外,当处理消息时出现错误时,消费者线程将不会恢复。我们的消费者读取消息并将其插入到MySql中。一旦网络出现故障,消费者就无法连接到MySql,然后它就会阻塞并停止读取消息,直到我们重新启动它。

  1. 添加分区时,旧数据和新数据会发生什么情况?文档(https://kafka.apache.org/documentation.html#basic_ops_modify_topic)上写着:

“请注意,分区的一个用例是对数据进行语义分区,添加分区不会改变现有数据的分区,因此如果消费者依赖于该分区,这可能会干扰他们。也就是说,如果数据是按散列(键)% number_of_partitions分区的,则此分区可能会通过添加分区来混洗,但Kafka不会尝试以任何方式自动重新分配数据。”

“不尝试自动重新分发数据”是什么意思?旧数据不变,新数据不会发送到添加的分区吗?

  1. 当经纪人关闭时,kafka生产者无法发送消息。

我们有一个包含3个分区和2个副本的主题。kafka集群有3个代理。但当代理关闭时,就会出现异常:

代码语言:javascript
复制
kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
  at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
 at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
       at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
 at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
   at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
      at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]

kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]

当添加新的代理时,同样的问题也会发生。我们必须在producer的"metadata.broker.list“配置中添加新的代理主机名和端口,然后重新启动它。

我们使用的是高级api,kafka的版本是:

代码语言:javascript
复制
<dependency>
      <groupId> org.apache.kafka</groupId >
      <artifactId> kafka_2.9.2</artifactId >
      <version> 0.8.2.0</version >
</dependency>

生产者配置:

代码语言:javascript
复制
<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />

消费者配置:

代码语言:javascript
复制
<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />

生产者代码和消费者代码类似于:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

EN

回答 2

Stack Overflow用户

发布于 2016-03-21 11:19:45

至于第二点,假设你的key是一个Long。假设你有10个分区。在分区之间分配Long的一种方法是简单地进行模运算key % num_partitions。但是现在考虑一下当您添加分区时会发生什么。根据num_partitions的当前值,已经写入的消息将位于错误的分区中。这就是说Kafka不会自动为你重新划分任何东西。

票数 1
EN

Stack Overflow用户

发布于 2019-03-29 07:22:53

首先,我需要您了解添加分区repartitioning.和的b/w 的区别

对于repartitioning:现有数据将从一个分区移动到另一个分区

当添加额外的分区时,:旧数据将保持不变,而新数据将分布在所有分区中。

在这两种情况下,组协调人将使用新的分区列表将信号发送给所有使用者,然后使用者将重新平衡,最后连接到所有请求。

在您的情况下,您可能会面临一些与增加分区无关的其他问题。

可能在服务器上启用了调试日志您将看到更多详细信息

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36114221

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档