WHAT
CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。
数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。
这个关系是自动替换的查询计划,逻辑计划返回与最初缓存的查询相同的结果。
CacheManager 只能在 Spark SQL 内部使用。
CacheManager 通过 SharedState 在 SparkSessions 之间共享。
val spark: SparkSession = ...
spark.sharedState.cacheManager
CacheManager 可以是空的。
通过在 Spark 的 conf/log4j.properties 添加下面的配置可以查看 CacheManager 内部发生了什么?
log4j.logger.org.apache.spark.sql.execution.CacheManager=ALL
在触发缓存并且日志打印级别符合的情况下,会出现下面的打印日志:
Asked to cache already cached data.
CacheManager 使用 CachedData 数据结构使用 LogicalPlan(结构化查询)和相应的 InMemoryRelation 逻辑运算符管理缓存结构化查询。
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()
可以看到缓存本质上是一个 IndexedSeq。
IndexedSeq表示保证不可变的索引序列。
索引序列支持恒定时间或接近恒定时间的元素访问和长度计算。
它们是根据用于索引和长度的抽象方法定义的。
索引序列不会给Seq添加任何新方法,但可以有效实现随机访问模式
IndexedSeq 的默认实现是一个 scala.Vector
如果说IndexedSeq是一个容器的话,那么CachedData就是容器里面存放的数据。
我们看看CachedData的类定义。
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
可以看到CachedData底层就是一个LogicalPlan 和InMemoryRelation。
InMemoryRelation 封装了一个缓存构建器,使用它,当我们使用缓存数据的时候,就不会触发 job,并且可以实现缓存 RDD 的懒加载。
final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized
可以看到,Spark 通过比较两个查询计划的canonicalized 是否相等来决定是否启用缓存。
那么,canonicalized 到底是什么呢?
我们知道实现同一种功能,不同开发人员使用的 SQL 语法都可能存在差异,此时,为了保证能够充分利用到已有的查询计划,我们需要针对不同的查询计划做一个规范化的处理,这就是canonicalized存在的意义。
canonicalized 是在 QueryPlan.scala 中被定义的
/**
* 返回一个计划,在该计划中,已尽最大努力以一种保留
* 结果但消除表面变化(区分大小写、交换操作顺序、表
* 达式id等)的方式对此进行转换。
* 计划`this.canonicalized == other.canonicalized` 总是会得到相同的结果。
* 需要特殊规范化的计划节点应覆盖 [[doCanonicalize()]] 方法。
* 他们应该自己消除表达式表面变化。
*/
@transient final lazy val canonicalized: PlanType = {
var plan = doCanonicalize()
// 如果计划没有因规范化而更改,请复制一份,这样我们就不会更改原始计划的_isCanonicalizedPlan标志。
if (plan eq this) {
plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
}
plan._isCanonicalizedPlan = true
plan
}
/**
* 定义规范化如何适用于当前计划。
*/
protected def doCanonicalize(): PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
mapExpressions {
case a: Alias =>
id += 1
// 作为表达式的根,Alias将始终采用任意的exprId,我们需要递增地从 0 开始分配 exprId,将其规范化以进行相等性测试。这个别名无关紧要,应该删除。
val normalizedChild = QueryPlan.normalizeExpressions(a.child, allAttributes)
Alias(normalizedChild, "")(ExprId(id), a.qualifier)
case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
// 顶层的“AttributeReference”也可以用于像“Alias”这样的输出,我们应该也要使 exprId 正常化。
id += 1
ar.withExprId(ExprId(id)).canonicalized
case other => QueryPlan.normalizeExpressions(other, allAttributes)
}.withNewChildren(canonicalizedChildren)
}
/**
* 通过使用输入属性中引用的序号更新AttributeReference中的exprId,规范化给定表达式中的exprId。
* 它类似于BindReferences,但我们在这里不使用BindReferences,因为计划可能会将表达式作为带有type属性的参数,并用BoundReference替换它将导致错误。
*/
def normalizeExpressions[T <: Expression](e: T, input: AttributeSeq): T = {
e.transformUp {
case s: PlanExpression[QueryPlan[_] @unchecked] =>
// 规范化子查询计划中的外部引用。
val normalizedPlan = s.plan.transformAllExpressionsWithPruning(
_.containsPattern(OUTER_REFERENCE)) {
case OuterReference(r) => OuterReference(QueryPlan.normalizeExpressions(r, input))
}
s.withNewPlan(normalizedPlan)
case ar: AttributeReference =>
val ordinal = input.indexOf(ar.exprId)
if (ordinal == -1) {
ar
} else {
ar.withExprId(ExprId(ordinal))
}
}.canonicalized.asInstanceOf[T]
}
通过上面的源码阅读可以得到以下的结论:
Spark 3.3.0 版本总共有 21 个特殊的 QueryPlan 重写了QueryPlan.doCanonicalize 方法。
HiveTableScanExec
RowDataSourceScanExec
SubqueryExec
ReusedExchangeExec
FileSourceScanExec
InMemoryTableScanExec
AdaptiveSparkPlanExec
ReusedSubqueryExec
SubqueryAlias
SubqueryAdaptiveBroadcastExec
SubqueryBroadcastExec
InMemoryRelation
HiveTableRelation
View
RangeExec
QueryStageExec
BroadcastExchangeExec
Join
LogicalRelation
ResolvedHint
BatchScanExec
遍历了上面 21 种特殊查询计划的源码后,可以很明显的得出下面的结论: