前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Opeartor的指标体系

Spark Opeartor的指标体系

作者头像
runzhliu
发布2020-08-06 10:36:03
8090
发布2020-08-06 10:36:03
举报
文章被收录于专栏:容器计算

1 Overview

spark-on-k8s-operator,下文简称 Spark Operator, 背景知识就不介绍太多了,本文主要分享一下 Spark Operator 的指标系统是如何构建的,之后可以按照 Spark Operator 的方法,给自己创建的 Operator 配上指标系统。

image_1e1l81vpp51d7kh1jeqnk1o3um.png-95kB
image_1e1l81vpp51d7kh1jeqnk1o3um.png-95kB

2 Metrics

2.1 Spark Metrics

Spark Operator 目前提供几种自定义的指标,这里自定义的意思是以 Spark Application 这个自定义资源对象为监控对象,围绕 Spark Application 创建的一些监控指标,来让 Spark Operator 的维护者,更好的监控 Operator 中 CRD 对象的情况。

下面是目前 Spark Operator 的指标。自定义指标基本都在 sparkapp_metrics.go 里定义。

代码语言:javascript
复制
type sparkAppMetrics struct {
    ...
	sparkAppSubmitCount           *prometheus.CounterVec
	sparkAppSuccessCount          *prometheus.CounterVec
	sparkAppFailureCount          *prometheus.CounterVec
	sparkAppFailedSubmissionCount *prometheus.CounterVec
	sparkAppRunningCount          *util.PositiveGauge
	sparkAppSuccessExecutionTime  *prometheus.SummaryVec
	sparkAppFailureExecutionTime  *prometheus.SummaryVec
	sparkAppExecutorRunningCount  *util.PositiveGauge
	sparkAppExecutorFailureCount  *prometheus.CounterVec
	sparkAppExecutorSuccessCount  *prometheus.CounterVec
}

可以看到 Spark Operator 记录了,Spark App 的提交数、成功数、失败数、提交 失败数、当前运行数、运行成功的时间统计、运行失败的时间统计、运行的 Executor 数、失败的 Executor 数以及成功的 Executor 数。

newSparkAppMetrics new 实际就是去注册的意思。按照指标的类型,CounterVec 或者 GaugeVec 等,配置好 TYPEHELP,或者 LABEL 等。

熟悉 Prometheus 的同学应该知道,Counter, Gauge, Summary, Histogram 几种类型,但是我们从上面的 sparkAppMetrics 上还看到了 PositiveGauge

有两个比较特殊的指标。一个是 sparkAppRunningCountsparkAppExecutorRunningCount,因为他们都是 Gauge 类型的,但是是不会降低到0以下的,所以这里注册的类型,是自定义的 PostiveGauge

代码语言:javascript
复制
sparkAppRunningCount := util.NewPositiveGauge(util.CreateValidMetricNameLabel(prefix, "spark_app_running_count"), "Spark App Running Count via the Operator", validLabels)
sparkAppExecutorRunningCount := util.NewPositiveGauge(util.CreateValidMetricNameLabel(prefix, "spark_app_executor_running_count"), "Spark App Running Executor Count via the Operator", validLabels)

这个是 Spark Operator 自实现的一个只用于正数的 Gauge,因为大家都知道 Gauge 其实是可以亦正亦负的。这个指标收集一些会增长会减少,但是不会跌破0的类型。

代码语言:javascript
复制
type PositiveGauge struct {
	mux         sync.RWMutex
	name        string
	gaugeMetric *prometheus.GaugeVec
}

实现的原理很简单,就是一个读写锁 mux 和一个指标名 name,以及一个正常的可以亦正亦负的 gaugeMetrics。下面是其构造方法 NewPositiveGauge

代码语言:javascript
复制
func NewPositiveGauge(name string, description string, labels []string) *PositiveGauge {
	validLabels := make([]string, len(labels))
	for i, label := range labels {
		validLabels[i] = CreateValidMetricNameLabel("", label)
	}

	gauge := prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: name,
			Help: description,
		},
		validLabels,
	)

	return &PositiveGauge{
		gaugeMetric: gauge,
		name:        name,
	}
}

2.2 Workqueue Metrics

这里的 Workqueue Metrics 是指 client-go 库里的 workqueue 包里的 metrics.go。因为 Spark Operator 实现的 Controller 里,用到了 rate limiting workqueue 这个工作队列。

https://github.com/kubernetes/client-go/blob/master/util/workqueue/metrics.go

Spark Operator 里自定义的 WorkQueueMetrics 主要是用于暴露这个工作队列的指标,其实就是给原来的 workqueque 的指标加上一个 prefix,这样后面收集指标和使用指标时候会更方便。

WorkQueueMetrics 的指标的结构体。

代码语言:javascript
复制
type WorkQueueMetrics struct {
	prefix string
}

下面是 Workqueque Metrics 的几种类型。

代码语言:javascript
复制
// Depth Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewDepthMetric(name string) workqueue.GaugeMetric {
    ...
}

// Adds Count Metrics for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewAddsMetric(name string) workqueue.CounterMetric {
    ...
}

// Latency Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewLatencyMetric(name string) workqueue.SummaryMetric {
    ...
}

// WorkDuration Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewWorkDurationMetric(name string) workqueue.SummaryMetric {
    ...
}

// Retry Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewRetriesMetric(name string) workqueue.CounterMetric {
    ...
}

func (p *WorkQueueMetrics) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
    ...
}

func (p *WorkQueueMetrics) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
    ...
}

2.3 指标初始化

Spark Metrics 和 Workqueue Metrics 两部分指标的初始化都在 InitializeMetrics 方法里。Spark Metrics 初始化,是普通的 Prometheus 指标收集初始化的方式,Workqueue Metrics 则是通过 Workqueue 的 Provider 来填充。

代码语言:javascript
复制
func InitializeMetrics(metricsConfig *MetricConfig) {
	// Start the metrics endpoint for Prometheus to scrape
	http.Handle(metricsConfig.MetricsEndpoint, promhttp.Handler())
	go http.ListenAndServe(fmt.Sprintf(":%s", metricsConfig.MetricsPort), nil)
	glog.Infof("Started Metrics server at localhost:%s%s", metricsConfig.MetricsPort, metricsConfig.MetricsEndpoint)

	workQueueMetrics := WorkQueueMetrics{prefix: metricsConfig.MetricsPrefix}
	workqueue.SetProvider(&workQueueMetrics)
}

2.4 其他

代码语言:javascript
复制
func CreateValidMetricNameLabel(prefix, name string) string {
	// "-" is not a valid character for prometheus metric names or labels.
	return strings.Replace(prefix+name, "-", "_", -1)
}

根据 Prometheus 的指引,Metric Name 除了可以用大小写字母和数字以为,还可以用下划线_,但是不能是中划线。CreateValidMetricNameLabel 方法是用来矫正指标名的,以防制造不符合规范的指标名,导致指标无法被 Prometheus 拉取。

The metric name specifies the general feature of a system that is measured (e.g. http_requests_total - the total number of HTTP requests received). It may contain ASCII letters and digits, as well as underscores and colons. It must match the regex [a-zA-Z_:][a-zA-Z0-9_:]*

代码语言:javascript
复制
func RegisterMetric(metric prometheus.Collector) {
	if err := prometheus.Register(metric); err != nil {
		// Ignore AlreadyRegisteredError.
		if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
			return
		}
		glog.Errorf("failed to register metric: %v", err)
	}
}

构建指标体系,需要有一个注册 registry 的过程。传入的就是一个 metrics 类型,然后通过注册接口注册即可。

代码语言:javascript
复制
type MetricConfig struct {
	MetricsEndpoint string
	MetricsPort     string
	MetricsPrefix   string
	MetricsLabels   []string
}

MeticsConfig 是 Spark Operator 的指标配置信息类型,包括 Operator 应用暴露的指标 Endpoint,指标端口 Port,指标的前缀(可以用于快速过滤)以及指标的 Labels(注意是一个数组,意思是指标会被打上这个数组的里的名字作为 Label)。

代码语言:javascript
复制
func fetchGaugeValue(m *prometheus.GaugeVec, labels map[string]string) float64 {
	// Hack to get the current value of the metric to support PositiveGauge
	pb := &prometheusmodel.Metric{}

	m.With(labels).Write(pb)
	return pb.GetGauge().GetValue()
}

2.5 工作时的指标

很重要的,就是根据 Spark App 的 CRD 对象的状态来输出指标了,这是指标体系最重要的部分。这里看不懂,前面都白看。

代码语言:javascript
复制
func (sm *sparkAppMetrics) exportMetrics(oldApp, newApp *v1beta2.SparkApplication) {
	oldState := oldApp.Status.AppState.State
	newState := newApp.Status.AppState.State
	if newState != oldState {
		switch newState {
		case v1beta2.SubmittedState:
		    // sparkAppSubmitCount + 1
		case v1beta2.RunningState:
		    // sparkAppRunningCount + 1
		case v1beta2.SucceedingState:
		    // sparkAppSuccessCount + 1
   		    // sparkAppRunningCount - 1
		case v1beta2.FailingState:
		    // sparkAppFailureCount + 1
		    // sparkAppRunningCount - 1
		case v1beta2.FailedSubmissionState:
		    // sparkAppFailedSubmissionCount + 1
		}
	}

	// Potential Executor status updates
	// 不赘述了
	for executor, newExecState := range newApp.Status.ExecutorState {
		switch newExecState {
		case v1beta2.ExecutorRunningState:
		case v1beta2.ExecutorCompletedState:
		case v1beta2.ExecutorFailedState:
		}
	}
}

指标构建和初始化完成了,也配置好什么时候该输出指标的方法了,然后看看 exportMetrics 这个方法是在什么时候被调用的。

代码语言:javascript
复制
// updateStatusAndExportMetrics updates the status of the SparkApplication and export the metrics.
func (c *Controller) updateStatusAndExportMetrics(oldApp, newApp *v1beta2.SparkApplication) error {
	// Skip update if nothing changed.
	if equality.Semantic.DeepEqual(oldApp, newApp) {
		return nil
	}

	updatedApp, err := c.updateApplicationStatusWithRetries(oldApp, func(status *v1beta2.SparkApplicationStatus) {
		*status = newApp.Status
	}, c.k8sMinorVersion)

	// Export metrics if the update was successful.
	if err == nil && c.metrics != nil {
	    // 调用
		c.metrics.exportMetrics(oldApp, updatedApp)
	}

	return err
}

当 Spark App CRD 对象在对 status 字段进行更新的时候,除了更新 status 以外,还会调用 exportMetics 方法来输出指标。

显然 updateStatusAndExportMetrics 这个方法是 Controller 在同步 CRD 对象的时候调用的。

代码语言:javascript
复制
if appToUpdate != nil {
	glog.V(2).Infof("Trying to update SparkApplication %s/%s, from: [%v] to [%v]", app.Namespace, app.Name, app.Status, appToUpdate.Status)
	// 当 appToUpdate 不为 nil,证明需要去更新 CRD 对象的 status 了
	// 这个时候同时输出指标
	err = c.updateStatusAndExportMetrics(app, appToUpdate)
	if err != nil {
		glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
		return err
	}
}

3 Summary

给 Operator 加指标,总体来说不是非常难的事情,难的地方在于判断自己到底需要收集什么指标,做什么样的监控,如果需要自定义指标系统,可以参考 Spark Operator 的做法,自定义 CRD 层面的 Metrics,如果还需要监控工作队列,直接通过 client-go 的 workqueue 的接口去做即可。

下面是一份真实的 Spark Operator 输出的指标,供参考。

代码语言:javascript
复制
...
# HELP spark_app_executor_failure_count Spark App Failed Executor Count via the Operator
# TYPE spark_app_executor_failure_count counter
spark_app_executor_failure_count{project="Unknown"} 7
# HELP spark_app_executor_running_count Spark App Running Executor Count via the Operator
# TYPE spark_app_executor_running_count gauge
spark_app_executor_running_count{project="Unknown"} 7
spark_app_executor_running_count{project="demo"} 22
# HELP spark_app_executor_success_count Spark App Successful Executor Count via the Operator
# TYPE spark_app_executor_success_count counter
spark_app_executor_success_count{project="demo"} 65
# HELP spark_app_running_count Spark App Running Count via the Operator
# TYPE spark_app_running_count gauge
spark_app_running_count{project="demo"} 0
# HELP spark_app_submit_count Spark App Submits via the Operator
# TYPE spark_app_submit_count counter
spark_app_submit_count{project="demo"} 1
# HELP spark_app_success_count Spark App Success Count via the Operator
# TYPE spark_app_success_count counter
spark_app_success_count{project="demo"} 1
# HELP spark_app_success_execution_time_microseconds Spark App Successful Execution Runtime via the Operator
# TYPE spark_app_success_execution_time_microseconds summary
spark_app_success_execution_time_microseconds{project="demo",quantile="0.5"} NaN
spark_app_success_execution_time_microseconds{project="demo",quantile="0.9"} NaN
spark_app_success_execution_time_microseconds{project="demo",quantile="0.99"} NaN
spark_app_success_execution_time_microseconds_sum{project="demo"} 5.83e+08
spark_app_success_execution_time_microseconds_count{project="demo"} 1
# HELP spark_application_controller_adds Total number of adds handled by workqueue: spark-application-controller
# TYPE spark_application_controller_adds counter
spark_application_controller_adds 120
# HELP spark_application_controller_depth Current depth of workqueue: spark-application-controller
# TYPE spark_application_controller_depth gauge
spark_application_controller_depth 0
# HELP spark_application_controller_latency Latency for workqueue: spark-application-controller
# TYPE spark_application_controller_latency summary
spark_application_controller_latency{quantile="0.5"} NaN
spark_application_controller_latency{quantile="0.9"} NaN
spark_application_controller_latency{quantile="0.99"} NaN
spark_application_controller_latency_sum 6.150365e+06
spark_application_controller_latency_count 120
# HELP spark_application_controller_longest_running_processor_microseconds Longest running processor microseconds: spark-application-controller
# TYPE spark_application_controller_longest_running_processor_microseconds gauge
spark_application_controller_longest_running_processor_microseconds 0
# HELP spark_application_controller_retries Total number of retries handled by workqueue: spark-application-controller
# TYPE spark_application_controller_retries counter
spark_application_controller_retries 472
# HELP spark_application_controller_unfinished_work_seconds Unfinished work seconds: spark-application-controller
# TYPE spark_application_controller_unfinished_work_seconds gauge
spark_application_controller_unfinished_work_seconds 0
# HELP spark_application_controller_work_duration How long processing an item from workqueue spark-application-controller takes.
# TYPE spark_application_controller_work_duration summary
spark_application_controller_work_duration{quantile="0.5"} NaN
spark_application_controller_work_duration{quantile="0.9"} NaN
spark_application_controller_work_duration{quantile="0.99"} NaN
spark_application_controller_work_duration_sum 1.2046801e+07
spark_application_controller_work_duration_count 120
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/02/22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Overview
  • 2 Metrics
    • 2.1 Spark Metrics
      • 2.2 Workqueue Metrics
        • 2.3 指标初始化
          • 2.4 其他
            • 2.5 工作时的指标
            • 3 Summary
            相关产品与服务
            Prometheus 监控服务
            Prometheus 监控服务(TencentCloud Managed Service for Prometheus,TMP)是基于开源 Prometheus 构建的高可用、全托管的服务,与腾讯云容器服务(TKE)高度集成,兼容开源生态丰富多样的应用组件,结合腾讯云可观测平台-告警管理和 Prometheus Alertmanager 能力,为您提供免搭建的高效运维能力,减少开发及运维成本。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档