前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一篇文章搞懂 Spark 3.x 的 CacheManager

一篇文章搞懂 Spark 3.x 的 CacheManager

作者头像
王知无-import_bigdata
发布2023-04-07 18:51:15
6290
发布2023-04-07 18:51:15
举报

WHAT

CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。

数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。

这个关系是自动替换的查询计划,逻辑计划返回与最初缓存的查询相同的结果。

CacheManager 只能在 Spark SQL 内部使用。

CacheManager 通过 SharedState 在 SparkSessions 之间共享。

代码语言:javascript
复制
val spark: SparkSession = ...
spark.sharedState.cacheManager

CacheManager 可以是空的。

通过在 Spark 的 conf/log4j.properties 添加下面的配置可以查看 CacheManager 内部发生了什么?

代码语言:javascript
复制
log4j.logger.org.apache.spark.sql.execution.CacheManager=ALL

在触发缓存并且日志打印级别符合的情况下,会出现下面的打印日志:

代码语言:javascript
复制
Asked to cache already cached data.

怎么触发 CacheManager 管理缓存?

  1. Spark 开发人员可以使用 Spark SQL 的 cache 或者 persist 算子 或者 SQL 的cache table 来通过 CacheManager 管理缓存。
  2. Spark Core 的cache 或者 persist 算子和 CacheManager 没有关系。

缓存怎么卸载?

  1. 使用 Dataset.unpersist 算子。
  2. 执行 DropTableCommand 和 TruncateTableCommand 逻辑命令。
  3. CatalogImpl 请求 uncache 和 refresh 表或视图,dropTempView/dropGlobalTempView

缓存到底长啥样?

CacheManager 使用 CachedData 数据结构使用 LogicalPlan(结构化查询)和相应的 InMemoryRelation 逻辑运算符管理缓存结构化查询。

代码语言:javascript
复制
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()

可以看到缓存本质上是一个 IndexedSeq。

IndexedSeq

IndexedSeq表示保证不可变的索引序列。

索引序列支持恒定时间或接近恒定时间的元素访问和长度计算。

它们是根据用于索引和长度的抽象方法定义的。

索引序列不会给Seq添加任何新方法,但可以有效实现随机访问模式

IndexedSeq 的默认实现是一个 scala.Vector

CachedData

如果说IndexedSeq是一个容器的话,那么CachedData就是容器里面存放的数据。

我们看看CachedData的类定义。

代码语言:javascript
复制
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)

可以看到CachedData底层就是一个LogicalPlan 和InMemoryRelation。

InMemoryRelation 封装了一个缓存构建器,使用它,当我们使用缓存数据的时候,就不会触发 job,并且可以实现缓存 RDD 的懒加载。

InMemoryRelation 还缓存了哪些配置?

  • spark.sql.inMemoryColumnarStorage.compressed (默认 enabled)
  • spark.sql.inMemoryColumnarStorage.batchSize (默认 10000)
  • 输入数据的存储级别 (默认 MEMORY_AND_DISK)。
  • 优化过的物理查询计划 (在请求 SessionState 执行 analyzed logical plan 之后)。
  • 输入的表名。
  • analyzed 查询计划的统计信息。

怎么判断查询是否已缓存?

代码语言:javascript
复制
final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized

可以看到,Spark 通过比较两个查询计划的canonicalized 是否相等来决定是否启用缓存。

那么,canonicalized 到底是什么呢?

canonicalized

我们知道实现同一种功能,不同开发人员使用的 SQL 语法都可能存在差异,此时,为了保证能够充分利用到已有的查询计划,我们需要针对不同的查询计划做一个规范化的处理,这就是canonicalized存在的意义。

canonicalized 是在 QueryPlan.scala 中被定义的

代码语言:javascript
复制
/**
 * 返回一个计划,在该计划中,已尽最大努力以一种保留
 * 结果但消除表面变化(区分大小写、交换操作顺序、表
 * 达式id等)的方式对此进行转换。
 * 计划`this.canonicalized == other.canonicalized` 总是会得到相同的结果。
 * 需要特殊规范化的计划节点应覆盖 [[doCanonicalize()]] 方法。
 * 他们应该自己消除表达式表面变化。
 */
@transient final lazy val canonicalized: PlanType = {
    var plan = doCanonicalize()
    // 如果计划没有因规范化而更改,请复制一份,这样我们就不会更改原始计划的_isCanonicalizedPlan标志。  
    if (plan eq this) {
      plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
    }
    plan._isCanonicalizedPlan = true
    plan
  }
代码语言:javascript
复制
/**
 * 定义规范化如何适用于当前计划。
 */
protected def doCanonicalize(): PlanType = {
    val canonicalizedChildren = children.map(_.canonicalized)
    var id = -1
    mapExpressions {
      case a: Alias =>
        id += 1
        // 作为表达式的根,Alias将始终采用任意的exprId,我们需要递增地从 0 开始分配 exprId,将其规范化以进行相等性测试。这个别名无关紧要,应该删除。
        val normalizedChild = QueryPlan.normalizeExpressions(a.child, allAttributes)
        Alias(normalizedChild, "")(ExprId(id), a.qualifier)

      case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
        // 顶层的“AttributeReference”也可以用于像“Alias”这样的输出,我们应该也要使 exprId 正常化。
        id += 1
        ar.withExprId(ExprId(id)).canonicalized

      case other => QueryPlan.normalizeExpressions(other, allAttributes)
    }.withNewChildren(canonicalizedChildren)
  }
代码语言:javascript
复制
/**
 * 通过使用输入属性中引用的序号更新AttributeReference中的exprId,规范化给定表达式中的exprId。
 * 它类似于BindReferences,但我们在这里不使用BindReferences,因为计划可能会将表达式作为带有type属性的参数,并用BoundReference替换它将导致错误。
 */
def normalizeExpressions[T <: Expression](e: T, input: AttributeSeq): T = {
    e.transformUp {
      case s: PlanExpression[QueryPlan[_] @unchecked] =>
        // 规范化子查询计划中的外部引用。
        val normalizedPlan = s.plan.transformAllExpressionsWithPruning(
          _.containsPattern(OUTER_REFERENCE)) {
          case OuterReference(r) => OuterReference(QueryPlan.normalizeExpressions(r, input))
        }
        s.withNewPlan(normalizedPlan)

      case ar: AttributeReference =>
        val ordinal = input.indexOf(ar.exprId)
        if (ordinal == -1) {
          ar
        } else {
          ar.withExprId(ExprId(ordinal))
        }
    }.canonicalized.asInstanceOf[T]
  }

通过上面的源码阅读可以得到以下的结论:

  1. 规范化重点在于消除表面变化(区分大小写、交换操作顺序、ExprId 等)
  2. 默认情况下规范化主要处理的是 ExprId。
  3. 特殊情况下规范化需要重写 QueryPlan.doCanonicalize 方法。

Spark 3.3.0 版本总共有 21 个特殊的 QueryPlan 重写了QueryPlan.doCanonicalize 方法。

代码语言:javascript
复制
HiveTableScanExec
RowDataSourceScanExec
SubqueryExec
ReusedExchangeExec
FileSourceScanExec
InMemoryTableScanExec
AdaptiveSparkPlanExec
ReusedSubqueryExec
SubqueryAlias
SubqueryAdaptiveBroadcastExec
SubqueryBroadcastExec
InMemoryRelation
HiveTableRelation
View
RangeExec
QueryStageExec
BroadcastExchangeExec
Join
LogicalRelation
ResolvedHint
BatchScanExec

遍历了上面 21 种特殊查询计划的源码后,可以很明显的得出下面的结论:

  1. 规范化更多的是对当前查询计划的副本进行操作
  2. 规范化在不同的场景下只会关注某些特定属性,即这些属性一致我们也会认为这些查询计划是同一个,在 CacheManager 中将会得到重用。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-12-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 怎么触发 CacheManager 管理缓存?
  • 缓存怎么卸载?
  • 缓存到底长啥样?
  • IndexedSeq
  • CachedData
  • InMemoryRelation 还缓存了哪些配置?
  • 怎么判断查询是否已缓存?
  • canonicalized
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档