前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Task 的执行流程③ - 执行 task

Spark Task 的执行流程③ - 执行 task

作者头像
codingforfun
发布2018-08-24 16:03:02
4390
发布2018-08-24 16:03:02
举报
文章被收录于专栏:牛肉圆粉不加葱

本文为 Spark 2.0 源码分析笔记,其他版本可能稍有不同

创建、分发 Task一文中我们提到 TaskRunner(继承于 Runnable) 对象最终会被提交到 Executor 的线程池中去执行,本文就将对该执行过程进行剖析。

该执行过程封装在 TaskRunner#run() 中,搞懂该函数就搞懂了 task 是如何执行的,按照本博客惯例,这里必定要来一张该函数的核心实现:

需要注意的是,上图的流程都是在 Executor 的线程池中的某条线程中执行的。上图中最复杂和关键的是 task.run(...) 以及任务结果的处理,也即怎么把各个 partition 计算结果汇报到 driver 端。

task 结果处理这一块内容将另写一篇文章进行说明,下文主要对 task.run(...) 进行分析。Task 类共有两种实现:

  • ResultTask:对于 DAG 图中最后一个 Stage(也就是 ResultStage),会生成与该 DAG 图中哦最后一个 RDD (DAG 图中最后边)partition 个数相同的 ResultTask
  • ShuffleMapTask:对于非最后的 Stage(也就是 ShuffleMapStage),会生成与该 Stage 最后的 RDD partition 个数相同的 ShuffleMapTask

Task#run(...) 方法中最重要的是调用了 Task#runTask(context: TaskContext) 方法,来分别看看 ResultTask 和 ShuffleMapTask 的实现:

ResultTask#runTask(context: TaskContext)

代码语言:javascript
复制
  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    //< 反序列化得到 rdd 及 func
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    //< 对 rdd 指定 partition 的迭代器执行 func 函数
    func(context, rdd.iterator(partition, context))
  }

实现代码如上,主要做了两件事:

  1. 反序列化得到 rdd 及 func
  2. 对 rdd 指定 partition 的迭代器执行 func 函数并返回结果

func 函数是什么呢?我举几个例子就很容易明白:

  • 对于 RDD#count() 的 ResultTask 这里的 func 真正执行的是 def getIteratorSize[T](iterator: Iterator[T]): Long,即计算该 partition 对应的迭代器的数据条数
  • 对于 RDD#take(num: Int): Array[T] 的 ResultTask 这里的 func 真正执行的是 (it: Iterator[T]) => it.take(num).toArray,即取该 partition 对应的迭代器的前 num 条数据

也就是说,func 是对已经计算获得的 RDD 的某个 partition 的迭代器执行在 RDD action 中预定义好的操作,具体的操作根据不同的 action 不同而不同。而这个 partition 对应的迭代器的获取是通过调动 RDD#iterator(split: Partition, context: TaskContext): Iterator[T] 去获取的,会通过计算或从 cache 或 checkpoint 中获取。

ShuffleMapTask#runTask(context: TaskContext)

与 ResultTask 对 partition 数据进行计算得到计算结果并汇报给 driver 不同,ShuffleMapTask 的职责是为下游的 RDD 计算出输入数据。更具体的说,ShuffleMapTask 要计算出 partition 数据并通过 shuffle write 写入磁盘(由 BlockManager 来管理)来等待下游的 RDD 通过 shuffle read 读取,其核心流程如下:

共分为四步:

  1. 从 SparkEnv 中获取 ShuffleManager 对象,当前支持 Hash、Sort Based、Tungsten-sort Based 以及自定义的 Shuffle(关于 shuffle 之后会专门写文章说明)
  2. 从 ShuffleManager 中获取 ShuffleWriter 对象 writer
  3. 得到对应 partition 的迭代器后,通过 writer 将数据写入文件系统中
  4. 停止 writer 并返回结果

参考:《Spark 技术内幕》


本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016.12.04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ResultTask#runTask(context: TaskContext)
  • ShuffleMapTask#runTask(context: TaskContext)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档