前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划13 | 度量系统MetricsSystem的建立

Spark Core源码精读计划13 | 度量系统MetricsSystem的建立

作者头像
大数据真好玩
发布2019-08-14 16:22:02
7510
发布2019-08-14 16:22:02
举报
文章被收录于专栏:暴走大数据暴走大数据

推荐阅读

《Spark源码精度计划1 | SparkConf》

《Spark Core源码精读计划2 | SparkContext组件初始化》

《Spark Core源码精读计划3 | SparkContext辅助属性及后初始化》

《Spark Core源码精读计划4 | SparkContext提供的其他功能》

《Spark Core源码精读计划5 | 事件总线及ListenerBus》

《Spark Core源码精读计划6 | AsyncEventQueue与LiveListenerBus》

《Spark Core源码精读计划7 | Spark执行环境的初始化》

《Spark Core源码精读计划8 | SparkEnv中RPC环境的基础构建》

《Spark Core源码精读计划9 | Spark RPC环境中的消息调度逻辑》

《Spark Core源码精读计划10 | NettyRpcEnv客户端消息发送逻辑》

《Spark Core源码精读计划11 | Spark广播机制的实现》

《Spark Core源码精读计划12 | Spark序列化及压缩机制浅析》

目录

  • 前言
  • 度量系统MetricsSystem类
    • 实例化
    • 类中的属性成员
    • 注册度量来源
    • 注册度量目的地
  • 度量配置MetricsConfig类
    • 初始化
    • 设置默认配置及从文件加载配置
    • 分拆各实例的配置
  • 度量来源Source与目的地Sink
    • Source实现类与示例
    • Sink实现类与示例
  • 总结

前言

前文讲完序列化管理器SerializerManager和广播管理器BroadcastManager之后,SparkEnv紧接着就会初始化Spark存储及任务调度部分的几个重要组件,如MapOutputTracker、ShuffleManager、MemoryManager、BlockManager等。由于它们的实现都相当复杂,且需要一定的背景知识,因此个人认为之后讲到相应内容时,再对它们进行分析比较自然。现在就暂时略过它们,来看看度量系统MetricsSystem。

我们已经知道,Spark Web UI是直观地展示运行状况、资源状态等监控数据的前端,而MetricsSystem就负责收集、存储和输出度量指标。对一个优秀的框架而言,监控与功能实现同等重要,因此了解MetricsSystem的相关细节很有意义。

度量系统MetricsSystem类

实例化

在代码#7.11中调用了MetricsSystem.createMetricsSystem()方法来实例化MetricsSystem类,逻辑很简单。

代码#13.1 - o.a.s.metrics.MetricsSystem.createMetricsSystem()方法

代码语言:javascript
复制
  def createMetricsSystem(
      instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
    new MetricsSystem(instance, conf, securityMgr)
  }

由此可见,MetricsSystem类有三个主构造方法参数,分别是:

  • instance,表示该度量系统对应的实例名称,可取的值如"driver"、"executor"、"master"、"worker"、"applications"、"*"(表示默认实例)等。
  • conf,即SparkConf配置项。
  • securityMgr,即安全性管理器SecurityManager的实例。
类中的属性成员

代码#13.2 - o.a.s.metrics.MetricsSystem类的属性成员

代码语言:javascript
复制
  private[this] val metricsConfig = new MetricsConfig(conf)

  private val sinks = new mutable.ArrayBuffer[Sink]
  private val sources = new mutable.ArrayBuffer[Source]
  private val registry = new MetricRegistry()

  private var running: Boolean = false

  private var metricsServlet: Option[MetricsServlet] = None

这些属性涉及到了度量系统中的几个基础角色,来看一看。

  • metricsConfig:度量系统的配置,它是MetricsConfig类的实例,MetricsConfig类提供了设置和加载度量配置的基础功能。
  • sources:度量来源的缓存数组。所谓度量来源,就是产生及收集监控指标的组件,都继承自Source特征。
  • sinks:度量目的地的缓存数组。所谓度量目的地,就是输出及表现监控指标的组件,都继承自Sink特征。
  • registry:度量注册中心,是com.codahale.metrics.MetricRegistry类的实例,Source和Sink都是通过它注册到度量仓库的。这里“度量仓库”并不是Spark内部的东西,而是Codahale提供的度量组件Metrics,Spark以它为基础来构建度量系统。
  • running:表示当前MetricsSystem是否在运行。
  • metricsServlet:本质上是一个特殊的Sink,专门供Spark Web UI使用。

关于MetricsConfig、Source和Sink,稍后会讲述。

注册度量来源
MetricsSystem提供了registerSource()方法来注册单个度量来源。

代码#13.3 - o.a.s.metrics.MetricsSystem.registerSource()与buildRegistryName()方法

代码语言:javascript
复制
  def registerSource(source: Source) {
    sources += source
    try {
      val regName = buildRegistryName(source)
      registry.register(regName, source.metricRegistry)
    } catch {
      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
  }

  private[spark] def buildRegistryName(source: Source): String = {
    val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
    val executorId = conf.getOption("spark.executor.id")
    val defaultName = MetricRegistry.name(source.sourceName)

    if (instance == "driver" || instance == "executor") {
      if (metricsNamespace.isDefined && executorId.isDefined) {
        MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
      } else {
        if (metricsNamespace.isEmpty) {
          logWarning(s"Using default name $defaultName for source because neither " +
            s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
        }
        if (executorId.isEmpty) {
          logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
            s"not set.")
        }
        defaultName
      }
    } else { defaultName }
  }

registerSource()方法首先将度量来源加入缓存数组,调用buildRegistryName()方法来构造Source的注册名称,然后调用MetricRegistry.register()方法注册到度量仓库。Source的注册名称取决于度量的命名空间(由spark.metrics.namespace参数控制,默认值为Application ID),以及Executor ID。其默认注册名称则由MetricRegistry.name()方法来生成。

在MetricsSystem初始化时,会根据MetricsConfig来初始化所有对应的Source。这个方法的实现如下。

代码#13.4 - o.a.s.metrics.MetricsSystem.registerSources()方法

代码语言:javascript
复制
  private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

其具体步骤是:先调用MetricsConfig.getInstance()方法取得实例名称下的配置,然后用MetricsConfig.subProperties()方法,根据正则表达式^source\.(.+)\.(.+)匹配出该实例所有与Source相关的参数,返回类型为HashMap[String, Properties]。最后,根据配置的class属性,利用反射构造出Source实现类的对象实例,调用代码#13.3中的方法将Source注册到度量仓库。

注册度量目的地

MetricsSystem并没有提供注册单个度量目的地的方法,而只提供了registerSinks()方法在初始化时批量注册度量目的地。

代码#13.5 - o.a.s.metrics.MetricsSystem.registerSinks()方法

代码语言:javascript
复制
  private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          val sink = Utils.classForName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)
          if (kv._1 == "servlet") {
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception =>
            logError("Sink class " + classPath + " cannot be instantiated")
            throw e
        }
      }
    }
  }

它前半部分的处理方式与registerSources()方法一致,不过是改用了正则表达式^sink\.(.+)\.(.+)匹配出该实例所有与Sink相关的参数而已。然后同样利用反射构造出Sink实现类的对象实例,如果度量实例名称为servlet,说明是Web UI使用的那个Sink,将它赋值给metricsServlet属性。否则,就将其加入sinks缓存数组。在MetricsSystem初始化的最后,会调用Sink.start()方法分别启动每个Sink。

度量配置MetricsConfig类

简单来讲,MetricsConfig主要负责玩“文字游戏”,也就是度量系统配置的设置与解析。我们由它的初始化方法initialize()入手,这个方法在MetricsSystem的构造方法中也有调用。

初始化

代码#13.6 - o.a.s.metrics.MetricsConfig.initialize()方法

代码语言:javascript
复制
  def initialize() {
    setDefaultProperties(properties)
    loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))

    val prefix = "spark.metrics.conf."
    conf.getAll.foreach {
      case (k, v) if k.startsWith(prefix) =>
        properties.setProperty(k.substring(prefix.length()), v)
      case _ =>
    }

    perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
    if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
      val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
      for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
           (k, v) <- defaultSubProperties if (prop.get(k) == null)) {
        prop.put(k, v)
      }
    }
  }

存在perInstanceSubProperties属性(其数据类型为HashMap[String, Properties])中。

设置默认配置及从文件加载配置

默认属性一共有4个,代码如下。

代码#13.7 - o.a.s.metrics.MetricsConfig.setDefaultProperties()方法

代码语言:javascript
复制
  private def setDefaultProperties(prop: Properties) {
    prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
    prop.setProperty("*.sink.servlet.path", "/metrics/json")
    prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
    prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
  }

loadPropertiesFromFile()方法会首先从给定的路径中加载配置文件。如果没有提供配置文件,就会从classpath下的metrics.properties文件中读取。代码比较简单,为节省篇幅,不再贴出。

分拆各实例的配置

代码#13.8 - o.a.s.metrics.MetricsConfig.subProperties()方法

代码语言:javascript
复制
  def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
    val subProperties = new mutable.HashMap[String, Properties]
    prop.asScala.foreach { kv =>
      if (regex.findPrefixOf(kv._1.toString).isDefined) {
        val regex(prefix, suffix) = kv._1.toString
        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
      }
    }
    subProperties
  }

度量配置的格式是:[instance].[sink|source].[name].[options]=value,比如上面默认配置中的master.sink.servlet.path=/metrics/master/json。这个方法的作用就是将原key的instance部分正则匹配出来作为HashMap的key,原key的其余部分作为Properties的key,原value作为Properties的value,以达到根据instance名称分组的效果。

度量来源Source与目的地Sink

由上面的分析,我们可以知道Spark的度量系统是由Instance、Source、Metrics、Sink四个部分组成的,它们之间的关系可以用下面的框图来表示。

图#13.1 - 度量系统主要组件的关系

接下来就看一看与Source和Sink相关的细节。

Source实现类与示例

Source是一个非常简单的特征,其中只定义了2个方法,分别用来获取度量来源的名称,以及其对应的注册中心。它有非常多种实现,如下图所示。

图#13.2 - Source继承体系

其中我们也会发现很多之前耳熟能详的名词,比如DAGScheduler、LiveListenerBus等,这说明度量系统在Spark内部有着广泛的应用。下面我们以ExecutorSource的部分代码为例来简单看看Source的具体实现。

代码#13.9 - o.a.s.executor.ExecutorSource类的部分代码

代码语言:javascript
复制
private[spark]
class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {
  override val metricRegistry = new MetricRegistry()
  override val sourceName = "executor"

  private def registerFileSystemStat[T](
        scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
    metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
      override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
    })
  }

  metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
    override def getValue: Int = threadPool.getActiveCount()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
    override def getValue: Long = threadPool.getCompletedTaskCount()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
    override def getValue: Int = threadPool.getPoolSize()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
    override def getValue: Int = threadPool.getMaximumPoolSize()
  })

  for (scheme <- Array("hdfs", "file")) {
    registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
    registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
    registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
    registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
    registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
  }

  val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
  val METRIC_RUN_TIME = metricRegistry.counter(MetricRegistry.name("runTime"))
  val METRIC_JVM_GC_TIME = metricRegistry.counter(MetricRegistry.name("jvmGCTime"))
  // ......
}

由此可见,ExecutorSource向注册中心中注册了很多指标,包括与线程池(threadpool)相关的Gauge、与文件系统(filesystem)相关的Gauge(Gauge是Metrics体系内提供的估计度量值的工具),以及大量的计数器,如GC、Shuffle、序列化方面的计数值。这些指标覆盖了整个Executor运行期的方方面面,看官也可以寻找其他Source的实现来进一步参考。

Sink实现类与示例

Sink也是一个非常简单的特征,其中定义了3个方法:start()/stop()方法分别用来启动和停止Sink,report()方法用于输出度量值。Sink的实现类如下图所示。

图#13.3 - Sink继承体系

这其中有些名称是可以顾名思义的,比如ConsoleSink输出到控制台,CsvSink输出到CSV文件,Slf4jSink输出到符合SLF4J规范的日志。另外,JmxSink可以将监控数据输出到JMX中,从而通过JVM可视化工具(如VisualVM)进行观察。MetricsServlet在前面已经说过,它可以利用Spark UI内置的Jetty服务将监控数据输出到浏览器页面。

下面以Slf4jSink为例简单看看Sink的具体实现。

代码#13.10 - o.a.s.metrics.sink.Slf4jSink类

代码语言:javascript
复制
private[spark] class Slf4jSink(
    val property: Properties,
    val registry: MetricRegistry,
    securityMgr: SecurityManager)
  extends Sink {
  val SLF4J_DEFAULT_PERIOD = 10
  val SLF4J_DEFAULT_UNIT = "SECONDS"
  val SLF4J_KEY_PERIOD = "period"
  val SLF4J_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => SLF4J_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
    case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
  }

  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
    .convertDurationsTo(TimeUnit.MILLISECONDS)
    .convertRatesTo(TimeUnit.SECONDS)
    .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}

可见,Slf4jSink实际上就是Codehale Metrics中Slf4jReporter类的简单封装。Slf4jReporter启动之后,会按照pollPeriod和pollUnit规定的时间定期去轮询度量值并输出。

总结

本文首先介绍了Spark度量系统的概念,通过阅读MetricsSystem类的相关源码,明确了度量系统是如果运作及发挥作用的。然后对度量配置MetricsConfig做了简单了解,最后简述了度量来源Source及目的地Sink的实现方式。由于度量和监控在Spark各主要功能模块中都是不可或缺的,因此今后在深入阅读Spark Core的其他源码时,我们会非常频繁地见到度量系统相关的方法调用。

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 度量系统MetricsSystem类
    • 实例化
      • 类中的属性成员
        • 注册度量来源
          • MetricsSystem提供了registerSource()方法来注册单个度量来源。
            • 注册度量目的地
            • 度量配置MetricsConfig类
              • 初始化
                • 设置默认配置及从文件加载配置
                  • 分拆各实例的配置
                  • 度量来源Source与目的地Sink
                    • Source实现类与示例
                      • Sink实现类与示例
                      • 总结
                      相关产品与服务
                      微服务引擎 TSE
                      微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档