首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何避免火花流中批量排队

如何避免火花流中批量排队
EN

Stack Overflow用户
提问于 2021-03-16 15:12:09
回答 1查看 1K关注 0票数 1

我有直接流的火花流,我使用下面的配置

批间隔60 spark.streaming.kafka.maxRatePerPartition 42 auto.offset.reset最早

当我使用最早的选项启动流批时,为了更快地使用Kafka的消息并减少延迟,我将spark.streaming.kafka.maxRatePerPartition保持为42。因此,它应该消耗42x60x60分区=每批151200条记录。

我有两个问题

  1. 我看到,最初的几批记录正确地消耗了151200张唱片,尽管卡夫卡有大量的唱片可供消费,但在后来的批次中逐渐减少了。请看下面的截图。可能是什么原因
  2. 我看到很多批次都在排队。我们怎么才能避免这种情况。

是否有可能实现以下场景,我们的批处理间隔为60,如果每批运行在60,下一批可以准时开始。如果一批花费的时间超过60,我们不希望下一批来排队。当现有的运行完成后,下一次运行可以从选择记录开始,直到那个时候。这样,我们就不会有滞后,也不会有成批排队。

星星之火UI -问题1的屏幕截图

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-16 17:11:17

你所观察到的是星火的背压机制的预期行为。

您已经将配置spark.streaming.kafka.maxRatePerPartition设置为42,并且在计算后,作业将开始获取。

代码语言:javascript
运行
复制
42 * 60 partitions * 60 seconds batch interval = 151200 records per batch

看看你所附截图中的时间(处理时间),这份工作就从这么多的记录开始。

然而,由于处理所有这151200条记录所需的时间超过60秒,背压机制将减少后续批中的输入记录。这只在几批之后才会发生,因为背压机制(也称为"PID控制器“)需要等到第一批完成之后,才能使用这种经验来估计下一次间隔的输入记录数。如前所述,处理前151200的时间超过了一个间隔,这意味着随后的两个间隔已经与maxRatePerPartition一起调度,而没有完成批处理间隔的经验。

这就是为什么只在第四批中看到输入记录下降的原因。然后,输入记录的数量仍然很高,无法在60秒内进行处理,因此输入记录的延时越来越大,PID控制器(背压)最终意识到它落后于许多记录,并且正在将输入记录的数量大幅度地减少到spark.streaming.backpressure.pid.minRate设置的最小值。在您的示例中,此值似乎设置为2,从而导致每批间隔2* 60 * 60 = 7200条记录。

总之,您所观察到的是预期的和有意的行为。流作业需要一些批次来理解和了解它应该从Kafka获取多少数据,以适应给定的(非灵活的)60秒批处理间隔。无论处理时间在一批中花费多长时间,您的流作业都将提前计划下一批处理,每60秒执行一次。

你能做的是:

  • 建议将maxRatePerPartition设置为实际容量的150-200%左右.只要让工作运行的时间再长一点,你就会看到100%的估计结果。
  • 在Kafka中使用60个分区时,需要确保数据均匀地分布在各个分区上。只有这样,maxRatePerPartition才能完成您想要做的事情。
  • 有60个分区,您可以使用您的星火集群中的60个核心来获得最大的消耗速度。
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66658032

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档