首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >结构化流如何为每个微批次规划流查询的逻辑计划?

结构化流如何为每个微批次规划流查询的逻辑计划?
EN

Stack Overflow用户
提问于 2019-10-31 17:13:23
回答 1查看 229关注 0票数 3

我在我的笔记本电脑上设置了一个小测试,它执行以下操作:

我创建了一个包含1000条消息的Kafka主题,其中每条消息包含几行,每行大约有100列。

在ListColumn中创建300个相当复杂的Spark列。无聚合。

在设置来自Kafka的流时,我设置了.option("maxOffsetsPerTrigger",1),以便在每个小批量中只处理一条消息。

然后,我将这些列应用于只包含一条消息的迷你批处理。

代码语言:javascript
运行
复制
val testStream = myStream
  .select(manyTestCols :_*)
  .writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

Spark大约需要10秒来处理每条消息。

然后,我将maxOffsetsPerTrigger更改为.option("maxOffsetsPerTrigger",1000),以便在每个小批量中处理1000条消息。

Spark大约需要11秒来处理每个小批量中的全部1000条消息。

所以,看起来Spark做了一些“设置工作”,一旦开始运行,它就会非常快地处理每个小批处理。

这项“设置工作”是否通过查询计划一直到每个小批量的物理计划?

如果是这样的话,Spark每个小批量都这样做有意义吗?

或者完全是其他的事情在发生?我正在看Spark的源代码,但如果有人已经通过这个练习的反馈,我会很感激。

Tx以获取任何见解。

EN

回答 1

Stack Overflow用户

发布于 2019-11-04 23:00:51

对于每个小批量,这项“设置工作”是通过查询计划一直到物理计划吗?

对于要在运行时填充的流查询的查询计划的执行特定部分,部分是肯定的,如下所示(具有到相应代码部分的链接):

  1. Proper relations for data sources (例如,用于无数据sources)
  2. Event-time watermark
  3. Current (micro-batch) time

LocalRelation

如果是这样的话,

在每个小批量中都这样做有意义吗?

绝对一点儿没错。在结构化流式传输中没有其他方法来短路无数据源,跟踪当前时间和水印。

这也是当水印发生变化时使用extra no-data micro-batch for stateful operators的原因。

正在看Spark的源代码,但如果有人已经做过这个练习的话,我会很感激的。

参见MicroBatchExecutionIncrementalExecution

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58640012

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档