我在我的笔记本电脑上设置了一个小测试,它执行以下操作:
我创建了一个包含1000条消息的Kafka主题,其中每条消息包含几行,每行大约有100列。
在ListColumn中创建300个相当复杂的Spark列。无聚合。
在设置来自Kafka的流时,我设置了.option("maxOffsetsPerTrigger",1),以便在每个小批量中只处理一条消息。
然后,我将这些列应用于只包含一条消息的迷你批处理。
val testStream = myStream
.select(manyTestCols :_*)
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()Spark大约需要10秒来处理每条消息。
然后,我将maxOffsetsPerTrigger更改为.option("maxOffsetsPerTrigger",1000),以便在每个小批量中处理1000条消息。
Spark大约需要11秒来处理每个小批量中的全部1000条消息。
所以,看起来Spark做了一些“设置工作”,一旦开始运行,它就会非常快地处理每个小批处理。
这项“设置工作”是否通过查询计划一直到每个小批量的物理计划?
如果是这样的话,Spark每个小批量都这样做有意义吗?
或者完全是其他的事情在发生?我正在看Spark的源代码,但如果有人已经通过这个练习的反馈,我会很感激。
Tx以获取任何见解。
发布于 2019-11-04 23:00:51
对于每个小批量,这项“设置工作”是通过查询计划一直到物理计划吗?
对于要在运行时填充的流查询的查询计划的执行特定部分,部分是肯定的,如下所示(具有到相应代码部分的链接):
如果是这样的话,
在每个小批量中都这样做有意义吗?
绝对一点儿没错。在结构化流式传输中没有其他方法来短路无数据源,跟踪当前时间和水印。
这也是当水印发生变化时使用extra no-data micro-batch for stateful operators的原因。
正在看Spark的源代码,但如果有人已经做过这个练习的话,我会很感激的。
https://stackoverflow.com/questions/58640012
复制相似问题