前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >3.4 RDD的计算

3.4 RDD的计算

作者头像
Albert陈凯
发布2018-04-08 10:22:09
6940
发布2018-04-08 10:22:09
举报
文章被收录于专栏:Albert陈凯

3.4 RDD的计算

3.4.1 Ta s k简介

原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。计算节点执行计算逻辑的部分称为Executor。Executor在准备好Task的运行时环境后,会通过调用org.apache.spark.scheduler.Task#run来执行计算。Spark的Task分为两种:

1)org.apache.spark.scheduler.ShuffleMapTask

2)org.apache.spark.scheduler.ResultTask

简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleMapTask。生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务的执行,执行过程的实现在org.apache. spark.executor.Executor.TaskRunner#run。第6章会介绍这一部分的实现原理和设计思想。

3.4.2 Task的执行起点

org.apache.spark.scheduler.Task#run会 调 用ShuffleMapTask或 者ResultTask的runTask;runTask会调用RDD的org.apache.spark.rdd.RDD#iterator。计算由此开始。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {

if(storageLevel != StorageLevel.NONE) {

//如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算

SparkEnv.get.cacheManager.getOrCompute(this, split, context,storageLevel)

} else {

//如果有checkpoint,那么直接读取结果;否则直接进行计算

computeOrReadCheckpoint(split, context)

}

}

其中,SparkEnv中包含了一个运行时节点所需要的所有的环境信息。cache-Manager是org.apache.spark.CacheManager,它负责调用BlockManager来管理RDD的缓存,如果当前RDD原来计算过并且把结果缓存起来,那么接下来的运算都可以通过BlockManager来直接读取缓存后返回。SparkEnv除了cacheManager,还包括以下重要的成员变量:

1)akka.actor.ActorSystem:运行在该节点的Actor System,其中运行在Driver上的名字是sparkDriver;运行在Executor上的是sparkExecutor。

2)org.apache.spark.serializer.Serializer:序列化和发序列化的工具。

3)org.apache.spark.MapOutputTracker;保存Shuffle Map Task输出的位置信息。其中在Driver上的Tracer是org.apache.spark.MapOutputTrackerMaster;而在Executor上的Tracker是org.apache.spark.MapOutputTrackerWorker,它会从org.apache.spark. MapOutputTrackerMaster获取信息。

4)org.apache.spark.shuffle.ShuffleManager:Shuffle的管理者,其中Driver端会注册Shuffle的信息,而Executor端会上报和获取Shuffle的信息。现阶段内置支持Hash Based Shuffle和Sort Based Shuffle,具体实现细节请参阅第7章。

5)org.apache.spark.broadcast.BroadcastManager:广播变量的管理者。

6)org.apache.spark.network.BlockTransferService:Executor读取Shuffle数据的Client。当前支持netty和nio,可以通过spark.shuffle.blockTransferService来设置。具体详情可以参阅第7章。

7)org.apache.spark.storage.BlockManager:提供了Storage模块与其他模块的交互接口,管理Storage模块。

8)org.apache.spark.SecurityManager:Spark对于认证授权的实现。

9)org.apache.spark.HttpFileServer:可以提供HTTP服务的Server。当前主要用于Executor端下载依赖。

10)org.apache.spark.metrics.MetricsSystem:用于搜集统计信息。

11)org.apache.spark.shuffle.ShuffleMemoryManager:管理Shuffle过程中使用的内存。ExternalAppendOnlyMap 和ExternalSorter都会从ShuffleMemoryManager中申请内存,在数据spill到Disk后会释放内存。当然了,当Task退出时这个内存也会被回收。为了使得每个thread都会比较公平地获取内存资源,避免一个thread申请了大量内存后造成其他的thread需要频繁地进行spill操作,它采取的内存分配策略是:对于N个thread,每个thread可以至少申请1/(2*N)的内存,但是至多申请1/N。这个N是动态变化的,感兴趣的读者可以查阅这个类的具体实现。

在用户创建org.apache.spark.SparkContext时会创建org.apache.spark.SparkEnv。

3.4.3 缓存的处理

如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。什么是存储级别?从用户的角度来看就是缓存保存到不同的存储位置,比如内存、硬盘、Tachyon;还有缓存的数据是否需要序列化等。详细的存储级别的介绍可以参阅第8章。

cacheManager对Storage模块进行了封装,使得RDD可以更加简单地从Storage模块读取或者写入数据。RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。在系统实现的层面上,可以认为Partition和Block是一一对应的。cacheManager会通过getOrCompute来判断当前的RDD是否需要进行计算。

首先,cacheManager会通过RDD的ID和当前计算的Partition的ID向Storage模块的BlockManager发起查询请求,如果能够获得Block的信息,会直接返回Block的信息。否则,代表该RDD是需要计算的。这个RDD以前可能计算过并且被存储到了内存中,但是后来由于内存紧张,这部分内存被清理了。在计算结束后,计算结果会根据用户定义的存储级别,写入BlockManager中。这样,下次就可以不经过计算而直接读取该RDD的计算结果了。核心实现逻辑如下:

def getOrCompute[T](

rdd: RDD[T],

partition: Partition,

context: TaskContext,

storageLevel: StorageLevel): Iterator[T] = {

//获取RDD的BlockId

val key = RDDBlockId(rdd.id, partition.index)

logDebug(s"Looking for partition $key")

blockManager.get(key) match { //向BlockManager查询是否有缓存

case Some(blockResult) => //缓存命中

//更新统计信息,将缓存作为结果返回

context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None => //没有缓存命中,需要计算

// 判断当前是否有线程在处理当前的Partition,如果有那么等待它结束后,直接从Block

// Manager中读取处理结果如果没有线程在计算,那么storedValues就是None,否则

// 就是计算的结果

val storedValues = acquireLockForPartition[T](key)

if (storedValues.isDefined) { // 已经被其他线程处理了,直接返回结果

return new InterruptibleIterator[T](context, storedValues.get)

}

// 需要计算

try {

// 如果被checkpoint过,那么读取checkpoint的数据;否则调用rdd的compute()开始

  // 计算

        val computedValues = rdd.computeOrReadCheckpoint(partition,context)

// Task是在Driver端执行的话就不需要缓存结果,这个主要是为了first() 或者take()

// 这种仅仅有一个执行阶段的任务的快速执行。这类任务由于没有Shuffle阶段,直接运行

// 在Driver端可能会更省时间

if (context.isRunningLocally) {

return computedValues

}

// 将计算结果写入到BlockManager

val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

val cachedValues =

putInBlockManager(key, computedValues, storageLevel, updatedBlocks)

// 更新任务的统计信息

val metrics = context.taskMetrics

val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(

Seq[(BlockId, BlockStatus)]())

metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)

new InterruptibleIterator(context, cachedValues)

} finally {

loading.synchronized {

loading.remove(key)

// 如果有其他的线程在等待该Partition的处理结果,那么通知它们计算已经完成,结果已

// 经存到BlockManager中(注意前面那类不会写入BlockManager的本地任务)

// loading.notifyAll()

}

}

}

}

3.4.4 checkpoint的处理

在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。为了理解checkpoint的RDD是如何读取计算结果的,需要先看一下checkpoint的数据是如何写入的。

首先在Job结束后,会判断是否需要checkpoint。如果需要,就调用org.apache. spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;接着创建一个org.apache.spark.rdd.CheckpointRDD;最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。简要的核心逻辑如下:

// 创建一个保存checkpoint数据的目录

val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)

val fs = path.getFileSystem(rdd.context.hadoopConfiguration)

if (!fs.mkdirs(path)) {

throw new SparkException("Failed to create checkpoint path " + path)

}

// 创建广播变量

val broadcastedConf = rdd.context.broadcast(

new SerializableWritable(rdd.context.hadoopConfiguration))

//开始一个新的Job进行计算,计算结果存入路径path中

rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)

//根据结果的路径path来创建CheckpointRDD

val newRDD = new CheckpointRDD[T](rdd.context, path.toString)

//保存结果,清除原始RDD的依赖、Partition信息等

RDDCheckpointData.synchronized {

cpFile = Some(path.toString)

cpRDD = Some(newRDD) // RDDCheckpointData对应的CheckpointRDD

rdd.markCheckpointed(newRDD)      // 清除原始RDD的依赖,Partition

cpState = Checkpointed            //标记checkpoint的状态为完成

}

至此,RDD的checkpoint完成,其中checkpoint的数据可以通过checkpointRDD的readFromFile读取。但是,上述逻辑在清除了RDD的依赖后,并没有和check-pointRDD建立联系,那么Spark是如何确定一个RDD是否被checkpoint了,而且正确读取checkpoint的数据呢?

答案就在org.apache.spark.rdd.RDD#dependencies的实现,它会首先判断当前的RDD是否已经Checkpoint过,如果有,那么RDD的依赖就变成了对应的CheckpointRDD:

privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD)

final def dependencies: Seq[Dependency[_]] = {

checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {

if (dependencies_ == null) { //没有checkpoint

dependencies_ = getDependencies

}

dependencies_

}

}

理解了Checkpoint的实现过程,接下来看一下computeOrReadCheckpoint的实现。前面提到了,它一共在两个地方被调用,org.apache.spark.rdd.RDD#iterator和org.apache. spark.CacheManager#getOrCompute。它实现的逻辑比较简单,首先检查当前RDD是否被Checkpoint过,如果有,读取Checkpoint的数据;否则开始计算。实现如下:

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext)

: Iterator[T] =

{

if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)

}

firstParent[T].iterator(split,context)会调用对应CheckpointRDD的iterator,最终调用到它的compute:

override def compute(split: Partition, context: TaskContext): Iterator[T] = {

val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))

CheckpointRDD.readFromFile(file, broadcastedConf, context) //读取Checkpoint的数据

}

3.4.5 RDD的计算逻辑

RDD的计算逻辑在org.apache.spark.rdd.RDD#compute中实现。每个特定的RDD都会实现compute。比如前面提到的CheckpointRDD的compute就是直接读取checkpoint数据。HadoopRDD就是读取指定Partition的数据。MapPartitionsRDD就是将用户的转换逻辑作用到指定的Partition上。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.07.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档