首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >卡夫卡消费者再平衡需要太长时间

卡夫卡消费者再平衡需要太长时间
EN

Stack Overflow用户
提问于 2017-09-09 13:11:52
回答 2查看 12.9K关注 0票数 6

我有一个Kafka流应用程序,它从几个主题中获取数据,并将数据加入到另一个主题中。

Kafka配置:

代码语言:javascript
运行
复制
5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor. 

注:我在卡夫卡经纪人运行的同一台机器上运行卡夫卡流应用程序。

每小时消耗/产生数以百万计的记录。每当我把任何卡夫卡经纪人打倒,它就进入再平衡,它需要大约。30分钟,有时甚至更长的时间再平衡。

有人知道如何解决卡夫卡消费者的再平衡问题吗?而且,它多次在重新平衡的同时抛出异常。

这将阻止我们在生产环境中使用此设置。任何帮助都将不胜感激。

代码语言:javascript
运行
复制
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)

Kafka Streams Config:

代码语言:javascript
运行
复制
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000

它内部创建的ConsumerConfig是:

代码语言:javascript
运行
复制
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
    check.crcs = true
    client.id = conversion-live-StreamThread-1-restore-consumer
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 2147483647
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-09-11 16:04:13

我建议通过参数StandbyTasks配置num.standby.replicas=1 (默认为0)。这将有助于大大减少再平衡时间。

此外,我建议将您的应用程序升级到Kafka 0.11。注意,Streams API 0.11向后兼容0.10.1和0.10.2代理,因此不需要为此升级代理。再平衡行为在0.11中得到了很大的改善,并将在即将发布的1.0版(cf )中得到进一步改进。因此,将应用程序升级到最新版本总是有助于重新平衡的。

票数 4
EN

Stack Overflow用户

发布于 2017-11-06 23:49:46

根据我的经验,首先,考虑到您的工作量,max.poll.records太小了:每小时消耗/生成数百万条记录。

因此,如果max.poll.records太小,比如说1,那么再平衡需要很长时间。我不知道原因。

其次,确保流应用程序输入主题的分区数是一致的。例如,如果APP-1有两个输入主题A和B。如果A有4个分区,B有2个,那么再平衡需要很长时间。但是,如果A和B都有4个分区事件--一些分区是空闲的,那么重新平衡时间是很好的。希望它能帮上忙

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46131065

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档