前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《深入理解Spark-核心思想与源码分析》读书笔记(1)

《深入理解Spark-核心思想与源码分析》读书笔记(1)

作者头像
用户1148523
发布2018-01-09 10:30:33
9290
发布2018-01-09 10:30:33
举报
文章被收录于专栏:FishFishFish

前两章

第一章主要是讲如何安装和配置spark,以及如何导入spark源码调试运行;第二章主要讲的是上次那本书《Spark快速大数据分析》的内容,科普一下spark的知识。

第三章 SparkContext的初始化

1. 概述

这章的主要内容就是讲解SparkContext的初始化。SparkContext就是所有Spark应用基础环境而配置Spark任务则是由SparkConf来完成。SparkContext的初始化一共有以下几步 1)创建 Spark 执行环境 SparkEnv; 2)创建 RDD 清理器 metadataCleaner; 3)创建并初始化 Spark UI; 4)Hadoop 相关配置及 Executor 环境变量的设置; 5)创建任务调度 TaskScheduler; 6)创建和启动 DAGScheduler; 7)TaskScheduler 的启动; 8)初始化块管理器 BlockManager(BlockManager 是存储体系的主要组件之一,将在第 4章介绍); 9)启动测量系统 MetricsSystem; 10)创建和启动 Executor 分配管理器 ExecutorAllocationManager; 11)ContextCleaner 的创建与启动; 12)Spark 环境更新; 13)创建 DAGSchedulerSource 和 BlockManagerSource; 14)将 SparkContext 标记为激活。 SparkContext构造器的参数就是SparkConf

    class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient

2. 创建执行环境SparkEnv

SparkEnv包含众多和Executor(执行器)相关的对象。Executor就是Worker(工作节点)的一个进程。包括以下内容 1)创建安全管理器 SecurityManager; 2)创建基于 Akka 的分布式消息系统 ActorSystem; 3)创建 Map 任务输出跟踪器 mapOutputTracker; 4)实例化 ShuffleManager; 5)创建 ShuffleMemoryManager; 6)创建块传输服务 BlockTransferService; 7)创建 BlockManagerMaster; 8)创建块管理器 BlockManager; 9)创建广播管理器 BroadcastManager; 10)创建缓存管理器 CacheManager; 11)创建 HTTP 文件服务器 HttpFileServer; 12)创建测量系统 MetricsSystem; 13)创建 SparkEnv。

2.1 安全管理器SecurityManager 用来管理系统的口令

            //Set our own authenticator鉴别器 to properly专有的 negotiate协商
            //userpassword for HTTP connections. This is needed by the HTTP client
            //fetching from the HttpServer. Put here so its only set once.
              if (authOn) {
                Authenticator.setDefault(
                  new Authenticator() {
                override def getPasswordAuthentication(): PasswordAuthentication = {
                  var passAuth: PasswordAuthentication = null
                  val userInfo = getRequestingURL().getUserInfo()
                  if (userInfo != null) {
                    val  parts = userInfo.split(":", 2)
                    passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
                }
                  return passAuth
                }
                  }
                )
              }

2.2 基于Akka的分布式消息系统ActorSystem ActorSystem是Akka提供的用于创建分布式消息通信系统的基础类。SparkEnv使用了AkkaUtils.createActorSystem方法完成,而createActorSystem实际上使用了doCreaterActorSystem来创造ActorSystem。

2.3 map任务输出跟踪器mapOutputTracker 跟踪map阶段任务的输出状态,便于reduce阶段任务获取地址和中间输出结果。所以这个mapOutputTracker就是用来管下面这些map和shuffle的,比如知道map输出block之类的,让reduce能找得到map的结果。

mapOutputTracker
mapOutputTracker

下面的代码是创建MapOutputTrackerMasterActor的。map任务的状态是由Executor像持有的MapOutputTrackerMasterActor发送消息,讲map任务状态同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses。

2.4 实例化ShuffleManager 这个就是用来管理上面那个图里的shuffle的

2.5 块传输服务BlockTransferService 获取远程节点上的block的,第四章讲

2.6 BlockManagerMaster介绍 这个负责对block进行管理,具体操作借助BlockManagerMasterActor,在初始化之后,创建BlockManager

2.7 创建广播管理器BroadcastManager BroadcastManager是用于配置信息和序列化后的RDD、Job以及ShuffleDEpendency等信息在本地储存。

2.8 创建缓存管理器CacheManager 用于缓存RDDM某个分区计算后的中间结果,第四章解释。

2.9 HTTP文件服务器HttpFileServer 提供对文件的HTTP访问。开始时要初始化,创建文件服务器的根目录和临时目录。创建jar包及其他文件的文件目录。用start()方法启动,而这个方法用了doStart方法,doStart方法就是各种配置server对象,然后启动它。

def initialize() {
    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
    fileDir = new File(baseDir, "files")
    jarDir = new File(baseDir, "jars")
    fileDir.mkdir()
    jarDir.mkdir()
    logInfo("HTTP File server directory is " + baseDir)
    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
    httpServer.start()
    serverUri = httpServer.uri
    logDebug("HTTP file server started at: " + serverUri)
}

2.10 创建测量系统MetricsSystem MetricsSystem是Spark的测量系统,其作用是定期将数据指标从数据源(source)拉到数据汇(sink)。

2.11 创建SparkEnv 当所有基础组件准备好后,使用new Spark(……)来创建执行环境SparkEnv。

3. 创建metadataCleaner

metadataCleaner的功能是清楚过期的持久化RDD。

            /**
             * Runs a timer task to periodically定期地 clean up metadata (e.g. old files or hashtable entries)
             */
            private[spark] class MetadataCleaner(
                cleanerType: MetadataCleanerType.MetadataCleanerType,
                cleanupFunc: (Long) => Unit,
                conf: SparkConf)
              extends Logging
            {
              val name = cleanerType.toString

              private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
              private val periodSeconds = math.max(10, delaySeconds / 10)
              private val timer = new Timer(name + " cleanup timer", true)


              private val task = new TimerTask {
                override def run() {
                  try {
                  //就这,定期清理,用cleanupFunc
                    cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
                    logInfo("Ran metadata cleaner for " + name)
                  } catch {
                    case e: Exception => logError("Error running cleanup task for " + name, e)
                  }
                }
              }

              if (delaySeconds > 0) {
                logDebug(
                  "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
                  "and period of " + periodSeconds + " secs")
                timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
              }

              def cancel() {
                timer.cancel()
              }
            }
而clean函数如下

            private[spark] def cleanup(cleanupTime:Long){                       persistentRdds.clearOldValues(cleanupTime)
            }

4. SparkUI详解

SparkUI
SparkUI

DAGScheduler是主要的产生各种Event的源头,它将各种SparkListenerEvent发送到listenerBus的时间队列中,然后BUS把事件和具体的sparklistener匹配,最终由sparkUI展示。

4.1 listenerBus详解 由三个部分组成。

  • 事件阻塞队列
  • 监听器数组
  • 事件匹配监听器线程

事件阻塞队列相当于排队上车的人,而线程就是公交车,不停地拉去排事件阻塞队列里的事件与监听器数组匹配,然后对事件进行操作。

private val EVENT_QUEUE_CAPACITY = 10000
  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
  private var queueFullErrorMessageLogged = false
  private var started = false
  // A counter that represents the number of events produced and consumed in the queue
  private val eventLock = new Semaphore(0)

  private val listenerThread = new Thread("SparkListenerBus") {
    setDaemon(true)
    override def run(): Unit = Utils.logUncaughtExceptions {
      while (true) {
        eventLock.acquire()
        // Atomically remove and process this event
        LiveListenerBus.this.synchronized {
          val event = eventQueue.poll
          if (event == SparkListenerShutdown) {
            // Get out of the while loop and shutdown the daemon thread
            return
          }
          Option(event).foreach(postToAll)
        }
      }
    }
  }

  def start() {
    if (started) {
      throw new IllegalStateException("Listener bus already started!")
    }
    listenerThread.start()
    started = true
  }
def post(event: SparkListenerEvent) {
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
      eventLock.release()
    } else {
      logQueueFullErrorMessage()
    }
  }

  def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }

  def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

  def stop() {
    if (!started) {
      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    }
    post(SparkListenerShutdown)
    listenerThread.join()
  }

4.2 构造JobProgressListener JobProgressListener用来统计Job的信息和状态。并在它们发生变化时进行反应。

4.3 SparkUI的创建与初始化 用create()方法加入一些listener,然后initialize()连接tab。再之后就是利用render()方法对页面进布局,实现显示。

5.Hadoop相关配置及Executor环境变量

获取Hadoop相关配置信息和对Executor的环境变量进行配置

6.创建任务调度器TaskScheduler

z这个就是用来负责任务的提交,并且请求集群管理器对任务调度。可以看做是任务调度的客户端。首先createTaskScheduler要创建TaskSchedulerImpl。 6.1 创建TaskSchedulerImpl

  • 从SparkConf中读取配置信息,包括每个任务分配的CPU数,调度模式(分为FAIR和FIFO两种,默认为FIFO)
  • 创建TaskResultGetter,它的作用是通过线程池对Worker上的Executor发送的Task的执行结果进行处理。 调度方式最终落实到接口SchedulerBackend上实现

6.2 TaskSchedulerImpl的初始化

1)使 TaskSchedulerImpl 持有 LocalBackend 的引用。 2)创建 Pool,Pool 中缓存了调度队列、调度算法及 TaskSetManager 集合等信息。 3)创建 FIFOSchedulableBuilder,FIFOSchedulableBuilder 用来操作 Pool 中的调度队列。

7.创建和启动DAGScheduler

DAGScheduler 主要用于在任务正式交给 TaskSchedulerImpl 提交之前做一些准备工作,包 括: 创 建 Job, 将 DAG 中 的 RDD 划 分 到 不 同 的 Stage, 提 交 Stage, 等 等。 此节留以后详细说明

8.TashScheduler的启动

此节留以后详细说明

9.启动测量系统MetricsSystem

这个测量系统有三个概念。

  • Instance:指定了谁在使用测量系统
  • Source:指定了从哪里收集测量数据
  • Sink:指定了往哪里输出测量数据

启动过程包括 1)注册Source 2)注册Sinks 3)给Sinks增加Jetty的ServletContextHandler

  private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

给Sinks增加Jetty的ServletContextHandler主要是为了和SparkUI同步,用到了getServletHandler方法,最终生成处理 /metrics/json请求的ServletContextHandler。

10.启动和创建ExecutorAllocationManager

ExecutorAllocationManager用与对已经分配的Executor进行管理。

  def start(): Unit = {
    listenerBus.addListener(listener)

    val scheduleTask = new Runnable() {
      override def run(): Unit = {
        try {
          schedule()
        } catch {
          case ct: ControlThrowable =>
            throw ct
          case t: Throwable =>
            logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
        }
      }
    }
    executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
  }

其中start方法就将listener加入bus中,通过监听事件,动态添加、删除Executor。

11.ContextCleaner的创建和启动

用于清理那些超出应用范围的RDD、ShuffleDepency和Broadcast对象。

/** Keep cleaning RDD, shuffle, and broadcast state. */
  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])

// Synchronize here to avoid being interrupted on stop()
        synchronized {
          reference.map(_.task).foreach { task =>
            logDebug("Got cleaning task " + task)
            referenceBuffer -= reference.get
            task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

12.Spark环境更新

在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。 postEnviromentUpdate() postApplicationStart()

13.创建DAGSchedulerSource和BlockManagerSource

private def initDriveMetrics(){
 SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
 SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriveMetrics()

14.将SparkContext标记为激活

SparkContext.setActiveContext(this,allowMultipleContexts)

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年02月26日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前两章
  • 第三章 SparkContext的初始化
    • 1. 概述
      • 2. 创建执行环境SparkEnv
        • 3. 创建metadataCleaner
          • 4. SparkUI详解
            • 5.Hadoop相关配置及Executor环境变量
              • 6.创建任务调度器TaskScheduler
                • 7.创建和启动DAGScheduler
                  • 8.TashScheduler的启动
                    • 9.启动测量系统MetricsSystem
                      • 10.启动和创建ExecutorAllocationManager
                        • 11.ContextCleaner的创建和启动
                          • 12.Spark环境更新
                            • 13.创建DAGSchedulerSource和BlockManagerSource
                              • 14.将SparkContext标记为激活
                              相关产品与服务
                              大数据
                              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档