几乎所有 Kafka Consumer 教程都是下面的代码:
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)
// Subscribe to Kafka topics
consumer.subscribe(topics);
while (true) {
// Poll Kafka for new messages
ConsumerRecords<String, String> records = consumer.poll(100);
// Processing logic
for (ConsumerRecord<String, String> record : records) {
doSomething(record);
}
}
}
基本上就是创建一个Kafka consumer,然后订阅对应的topics,然后就可以无限消费数据了,消费到数据后对每一条消息进行处理,这个过程我们叫做‘拉取然后循环处理’(poll-then-process loop)。
这相当简单,易于实施,人们可能一直在生产中使用它而没有任何问题。但是,此模型存在各种问题,我们将在下一节中详细介绍。
看上面的代码,我们开发者可能会认为 poll 是一种向 Kafka 发出需求信号的方式。我们的消费者仅在完成对先前消息的处理后才进行轮询以获取更多消息。如果它的处理速度很慢,Kafka 将充当‘减震器’,确保即使在生产速度高得多的情况下我们也不会丢失任何消息。另一方面,当处理速度较慢时,连续获取数据之间的间隔也会增加,这是有问题的,因为 max.poll.interval.ms 配置有一个默认的(5 分钟)上限:
max.poll.interval.ms 使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将进行rebalance,以便将分区重新分配给另一个成员。
换句话说,如果我们的消费者没有在每个 max.poll.interval.ms 中至少调用一次 poll ,那它就像死了一样。发生这种情况时,Kafka 会执行一个rebalance过程,将已死消费者的当前工作分配给其消费者组的其他成员。这在已经很慢的处理速率中引入了更多的开销和延迟。
更糟糕的是,如果处理导致一个消费者的速度变慢,很可能会导致其他消费者接管其工作时出现同样的问题。此外,假定的死亡消费者在下一次轮询时尝试重新加入组时也可能导致重新平衡(请记住,这是一个无限循环!)。这两者都使得rebalance一次又一次地发生,进一步减缓了消费。
现在,还有另一种配置可以帮助解决这种情况:
max.poll.records 单次调用 poll() 返回的最大记录数。请注意, max.poll.records 不会影响底层的获取行为。消费者将缓存来自每个获取请求的记录,并从每次轮询中返回它们。
将此设置为较低的值,我们的消费者将在每次轮询时处理更少的消息。因此轮询间隔将减少。或者,我们也可以将 max.poll.interval.ms 增加到更大的值。如果我们不能摆脱 poll-then-process 循环,这应该可以暂时解决问题。然而,它并不理想。
首先,这些配置是在我们启动消费者时设置的,但它们是否工作取决于消息或应用程序。我们可能会为每个应用程序专门设置它们,但最终,我们正在玩猜谜游戏并祈祷我们很幸运。
其次,在最坏的情况下,rebalance过程开始可能需要两倍于 max.poll.interval.ms 的持续时间:
我们从不希望rebalance花费更多时间,因此设置更高的 max.poll.interval.ms 并不是很好。
最后,这些配置意味着我们的消费者被“期望”频繁地轮询,至少每 max.poll.interval.ms 一次,无论它在做什么类型的处理。如果不包含这种期望,poll-then-process 循环不仅会误导开发人员,而且注定会失败。
Kafka 只保证一个分区内消息的顺序。来自不同分区的消息是不相关的,可以并行处理。这就是为什么在 Kafka 中,一个主题中的分区数是并行度的单位。
理论上,我们可以通过运行与主题上的分区数量一样多的消费者来轻松实现最大并行度。然而,实际上,这开销太大,更不用说它对增加rebalance机会的影响,因为有很多的消费者可能是来来去去(come and go)。
如果我们再次查看我们的消费者代码,它可以订阅多个主题并可能接收来自多个分区的消息。然而,在处理这些消息时,它会一一处理。这不是最优的。
现在,假设我们的处理逻辑非常简单,我们可以只使用线程池来并行化它吗?例如,通过向线程池提交一个处理任务,对于每条消息?
嗯,它仅在我们不关心处理排序和保证(例如最多一次、至少一次等)时才有效。因此在实践中它不是很有用。
poll-then-process 循环的许多挫折来自不同的关注点——轮询、处理、偏移提交——混合在一起的情况。结果,当我们将它们分成独立的组件时,我们最终得到了一个改进的模型,它可以适当地支持并行处理和背压。下面更详细地描述了每个组件。
Work Queues 是 Poller 和 Executor 之间的通信通道:
简而言之,Poller 封装了 Kafka 中与 poll 相关的一切:
pause(Collection<TopicPartition> partitions) 暂停从请求的分区中提取。未来对 poll(Duration) 的调用将不会从这些分区返回任何记录,直到使用 resume(Collection) 恢复它们。
Executor 就像一个线程池,它在其中维护多个worker来处理消息:
通过这种设置,一个分区内的消息按顺序处理,而来自不同分区的消息并行处理。
Kafka 中的每条消息都与一个偏移量(offset)相关联——一个整数,表示它在当前分区中的位置。通过存储这个数字,我们实质上为我们的消费者提供了一个检查点。如果它失败并返回,它知道从哪里继续。因此,在 Kafka 中实现各种处理保证至关重要:
Kafka 的自动提交呢?Confluent声称:
使用自动提交可以让您“至少一次”(at least once)交付:Kafka 保证不会丢失任何消息,但重复消息是可能的。
这适用于交付,但是,它不为处理提供任何保证:
因此,我们总是将 enable.auto.commit 设置为 false 并让 Offset Manager 手动管理偏移量。
让我们通过几个示例用例来了解组件如何协同工作以满足不同的处理保证。
对于最多一次,我们只需要在处理消息之前提交偏移量。我们可以在处理每条消息之前立即执行此操作。但是,在引入更多成本的同时,并没有给我们更强的保证。因此,Poller 对此负责。每次轮询后,它将告诉偏移管理器保存这些偏移量并等待来自 Kafka 的成功确认,然后再将消息排队以进行处理。
在rebalance事件之前,它只需要向 Executor 发送一个即发即弃的信号以停止处理。然后它取消工作队列并返回等待rebalance。丢失的消息是那些仍在队列中或正在处理中的消息。如果我们想在不影响rebalance持续时间的情况下优化更少的丢失,我们可以使用更小的队列大小。
对于至少一次,我们只需要确保仅在成功处理消息后才保存偏移量。因此,如果我们要处理 10 条消息,我们不需要为所有消息保存偏移量,而只需要保存最后一条消息。
在此设置中,Executor 将在每次完成对消息的处理时向 Offset Manager 发出信号。偏移量管理器跟踪每个分区的最新偏移量 - 并决定何时将它们提交给 Kafka。例如,我们可以将 Offset Manager 设置为每 5 秒提交一次。无论新消息是否出现,都会发生这种情况。
在rebalance事件之前,Poller 设置了一个硬性截止日期,并通知 Executor 结束其正在进行的处理,并通知 Offset Manager 以跟进最后一次提交。如果截止日期已经过去,或者 Poller 收到了其他人的响应,它会取消工作队列并返回等待rebalance。
为了优化减少重复处理,我们可以:
在这种情况下,需要在一个事务中进行偏移保存和消息处理。这意味着 Executor 和 Offset Manager 使用同步调用紧密合作以实现它。
在rebalance事件之后,轮询器向偏移管理器询问当前分配的已保存偏移量。然后它会在恢复轮询之前尝试恢复保存的位置。
public void seek(TopicPartition partition, long offset) 覆盖消费者将在下一次轮询(超时)时使用的获取偏移量。
在rebalance事件之前,Poller 会通知 Executor 并等待其响应。Executor 回滚其正在进行的事务并返回到 Poller。Poller 然后取消工作队列并返回等待rebalance。
我们分析了 loop-then-process 循环的各种问题,并提出了一个更合适的模型来理解和实现 Kafka Consumer。缺点是它要复杂得多,对于初学者来说可能并不容易。我们将这种复杂性归咎于 Kafka 及其低级 API。
在实践中,我们可能不会自己做,而是使用一个现成的库,它可能基于也可能不基于类似模型:Alpakka Kafka、Spring for Kafka、zio-kafka 等......即便如此,所提出的模型对于评估这些解决方案或实施新的解决方案也很有用。
来源:
https://www.toutiao.com/article/7095235111150879262/?log_from=f5c5aad449665_1652927718906
“IT大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com
来都来了,走啥走,留个言呗~
IT大咖说 | 关于版权
由“IT大咖说(ID:itdakashuo)”原创的文章,转载时请注明作者、出处及微信公众号。投稿、约稿、转载请加微信:ITDKS10(备注:投稿),茉莉小姐姐会及时与您联系!
感谢您对IT大咖说的热心支持!