前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅析SparkContext中的组件与创建流程

浅析SparkContext中的组件与创建流程

作者头像
tyrantlucifer
发布2022-03-23 15:14:58
4570
发布2022-03-23 15:14:58
举报
文章被收录于专栏:Tyrant Lucifer

前言

在Spark框架中,应用程序的提交离不开Spark Driver,而Spark Driver的初始化始终围绕SparkContext的初始化,可以说SparkContext是Spark程序的发动机引擎,有了它程序才能跑起来,在spark-core中,SparkContext重中之重,它提供了很多能力,比如生成RDD,比如生成广播变量等,所以学习SparkContext的组件和启动流程有助于剖析整个Spark内核的架构。

SparkContext组件概览

在SparkContext中包含了整个框架中很重要的几部分:

  • SparkEnv:Spark的运行环境,Executor会依赖它去执行分配的task,不光Executor中有,同时为了保证本地模式任务也能跑起来,Driver中也有
  • SparkUI:Spark作业的监控页面,底层并没有采用前端技术,纯后端实现,用以对当前SparkJob的监控和调优,可以从页面观察到目前的Executor的jvm信息,每个job的stage划分和task划分,同时还可以观察到每个task处理的数据,用以发现数据是否倾斜
  • DAGScheduler:DAG调度器,是SparkJob调度系统的重要组件之一,负责创建job,根据RDD依赖情况划分stage,提交stage,将作业划分成一个有向无环图
  • TaskScheduler:任务调度器,是SparkJob调度系统的重要组件之一,负责按照调度算法将DAGScheduler创建的task分发至Executor,DAGScheduler是它的前置调度
  • SparkStatusTracker:提供对作业、Stage的监控
  • ConsoleProcessBar:利用SparkStatusTracker提供监控信息,将任务进度以日志的形式打印到终端中
  • HearbeatReceiver:心跳接收器,所有Executor都会定期向它发送心跳信息,用以统计存活的Executor,此信息会一直同步给TaskScheduler,用以保证TaskScheduler去分发task的时候会挑选合适的Executor
  • ContextCleaner:上下文清理器,用异步的方式去清理那些超出应用作用域范围的RDD、ShuffleDependency和Broadcast
  • LiveListenerBus:SparkContext中的事件总线,可以接收各个组件的事件,并且通过异步的方式对事件进行匹配并调用不同的回调方法
  • ShutdownHookManager:关闭时的钩子管理器,用以做一些清理工作,比如资源释放等
  • AppStatusStore:存储Application状态数据,在2.3.0之后的版本引入
  • EventLoggingListener(可选):将事件持久化到存储的监听器,通过spark.eventLog.enabled 进行控制
  • ExecutorAllocationManager(可选):Executor动态分配管理器,根据工作负载状态动态调整Executor的数量,通过属性spark.dynamicAllocation.enabledspark.dynamicAllocation.testing 进行控制

SparkContext初始化流程

在探究SparkContext初始化流程之前,先看一下这个类里有哪些属性,有助于我们去理解它在初始化的过程中做了哪些工作:

代码语言:javascript
复制
/*spark conf对象*/
private var _conf: SparkConf = _
/*保存event log日志的目录*/
private var _eventLogDir: Option[URI] = None
/*event log日志的压缩格式*/
private var _eventLogCodec: Option[String] = None
/*spark context 事件总线*/
private var _listenerBus: LiveListenerBus = _
/*spark env 运行环境*/
private var _env: SparkEnv = _
/*spark status tracker 作业状态监控器*/
private var _statusTracker: SparkStatusTracker = _
/*console progress bar 终端输出作业状态进度器*/
private var _progressBar: Option[ConsoleProgressBar] = None
/*spark ui*/
private var _ui: Option[SparkUI] = None
/*hadoop 配置文件*/
private var _hadoopConfiguration: Configuration = _
/*executor 内存大小*/
private var _executorMemory: Int = _
/*向executor提交task的管理控制器*/
private var _schedulerBackend: SchedulerBackend = _
/*task scheduler*/
private var _taskScheduler: TaskScheduler = _
/*heartbeat receiver*/
private var _heartbeatReceiver: RpcEndpointRef = _
/*dag scheduler*/
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
/*event logging listener*/
private var _eventLogger: Option[EventLoggingListener] = None
/*executor allocation manager*/
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
/*context cleaner*/
private var _cleaner: Option[ContextCleaner] = None
/*事件总线启动标识*/
private var _listenerBusStarted: Boolean = false
/*作业提交额外的jars*/
private var _jars: Seq[String] = _
/*作业提交额外的files*/
private var _files: Seq[String] = _
/*shutdown hook manager*/
private var _shutdownHookRef: AnyRef = _
/*app status store*/
private var _statusStore: AppStatusStore = _

实际上SparkContext初始化的过程大抵上就以上各种组件的初始化过程,接下来看详细启动流程:

  1. 使用构造方法中config的clone方法给自己的私有_conf进行赋值,同时校验SparkConf中的必要参数是否正确
代码语言:javascript
复制
class SparkContext(config: SparkConf) extends Logging {}

    _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }

    // log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }

    if (_conf.getBoolean("spark.logConf", false)) {
      logInfo("Spark configuration:\n" + _conf.toDebugString)
    }

    // Set Spark driver host and port system properties. This explicitly sets the configuration
    // instead of relying on the default value of the config constant.
    _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
    _conf.setIfMissing("spark.driver.port", "0")

    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

    _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten
  1. 根据配置项初始化EventLog的保存路径和压缩格式
代码语言:javascript
复制
    _eventLogDir =
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }

    _eventLogCodec = {
      val compress = _conf.getBoolean("spark.eventLog.compress", false)
      if (compress && isEventLogEnabled) {
        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
      } else {
        None
      }
    }
  1. 初始化事件总线
代码语言:javascript
复制
_listenerBus = new LiveListenerBus(_conf)
  1. 初始化AppStatusStore
代码语言:javascript
复制
    // Initialize the app status store and listener before SparkEnv is created so that it gets
    // all events.
    _statusStore = AppStatusStore.createLiveStore(conf)
    listenerBus.addToStatusQueue(_statusStore.listener.get)
  1. 初始化SparkEnv
代码语言:javascript
复制
    // Create the Spark execution environment (cache, map output tracker, etc)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
  1. 初始化SparkStatusTracker
代码语言:javascript
复制
_statusTracker = new SparkStatusTracker(this, _statusStore)
  1. 初始化ConsoleProgressBar
代码语言:javascript
复制
    _progressBar =
      if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
        Some(new ConsoleProgressBar(this))
      } else {
        None
      }
  1. 初始化SparkUI
代码语言:javascript
复制
    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
          startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    _ui.foreach(_.bind())
  1. 初始化hadoopConfiguration
代码语言:javascript
复制
    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
  1. 初始化executorMemory
代码语言:javascript
复制
    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)
  1. 初始化heatbeatReveiver
代码语言:javascript
复制
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
  1. 初始化任务调度器并启动
代码语言:javascript
复制
    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()
  1. 初始化applicationId
代码语言:javascript
复制
    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
      System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId))
  1. 初始化SparkEnv中的BlockManager
代码语言:javascript
复制
    _env.blockManager.initialize(_applicationId)
  1. 初始化SparkEnv中的metricsSystem
代码语言:javascript
复制
    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    _env.metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
  1. 初始化EventLoggingListener
代码语言:javascript
复制
    _eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addToEventLogQueue(logger)
        Some(logger)
      } else {
        None
      }
  1. 初始化ExecutorAllocationManager
代码语言:javascript
复制
    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        schedulerBackend match {
          case b: ExecutorAllocationClient =>
            Some(new ExecutorAllocationManager(
              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
              _env.blockManager.master))
          case _ =>
            None
        }
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())
  1. 初始化ContextCleaner
代码语言:javascript
复制
    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())
  1. 初始化ShutdownHookManager
代码语言:javascript
复制
    logDebug("Adding shutdown hook") // force eager creation of logger
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      try {
        stop()
      } catch {
        case e: Throwable =>
          logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
      }
    }
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Tyrant Lucifer 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • SparkContext组件概览
  • SparkContext初始化流程
相关产品与服务
事件总线
腾讯云事件总线(EventBridge)是一款安全,稳定,高效的云上事件连接器,作为流数据和事件的自动收集、处理、分发管道,通过可视化的配置,实现事件源(例如:Kafka,审计,数据库等)和目标对象(例如:CLS,SCF等)的快速连接,当前 EventBridge 已接入 100+ 云上服务,助力分布式事件驱动架构的快速构建。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档