我有直接流的火花流,我使用下面的配置
批间隔60 spark.streaming.kafka.maxRatePerPartition 42 auto.offset.reset最早
当我使用最早的选项启动流批时,为了更快地使用Kafka的消息并减少延迟,我将spark.streaming.kafka.maxRatePerPartition保持为42。因此,它应该消耗42x60x60分区=每批151200条记录。
我有两个问题
是否有可能实现以下场景,我们的批处理间隔为60,如果每批运行在60,下一批可以准时开始。如果一批花费的时间超过60,我们不希望下一批来排队。当现有的运行完成后,下一次运行可以从选择记录开始,直到那个时候。这样,我们就不会有滞后,也不会有成批排队。
发布于 2021-03-16 17:11:17
你所观察到的是星火的背压机制的预期行为。
您已经将配置spark.streaming.kafka.maxRatePerPartition
设置为42,并且在计算后,作业将开始获取。
42 * 60 partitions * 60 seconds batch interval = 151200 records per batch
看看你所附截图中的时间(处理时间),这份工作就从这么多的记录开始。
然而,由于处理所有这151200条记录所需的时间超过60秒,背压机制将减少后续批中的输入记录。这只在几批之后才会发生,因为背压机制(也称为"PID控制器“)需要等到第一批完成之后,才能使用这种经验来估计下一次间隔的输入记录数。如前所述,处理前151200的时间超过了一个间隔,这意味着随后的两个间隔已经与maxRatePerPartition一起调度,而没有完成批处理间隔的经验。
这就是为什么只在第四批中看到输入记录下降的原因。然后,输入记录的数量仍然很高,无法在60秒内进行处理,因此输入记录的延时越来越大,PID控制器(背压)最终意识到它落后于许多记录,并且正在将输入记录的数量大幅度地减少到spark.streaming.backpressure.pid.minRate
设置的最小值。在您的示例中,此值似乎设置为2,从而导致每批间隔2* 60 * 60 = 7200条记录。
总之,您所观察到的是预期的和有意的行为。流作业需要一些批次来理解和了解它应该从Kafka获取多少数据,以适应给定的(非灵活的)60秒批处理间隔。无论处理时间在一批中花费多长时间,您的流作业都将提前计划下一批处理,每60秒执行一次。。
你能做的是:
maxRatePerPartition
设置为实际容量的150-200%左右.只要让工作运行的时间再长一点,你就会看到100%的估计结果。https://stackoverflow.com/questions/66658032
复制相似问题