# 1.度量指标

### 1.1.资源用量

#### 1.1.2.范例

```Consider a job with:
4 mappers with runtime {12, 15, 20, 30} mins.
4 reducers with runtime {10 , 12, 15, 18} mins.
Container size of 4 GB
Then,
Resource used by all mappers: 4 * (( 12 + 15 + 20 + 30 ) / 60 ) GB Hours = 5.133 GB Hours
Resource used by all reducers: 4 * (( 10 + 12 + 15 + 18 ) / 60 ) GB Hours = 3.666 GB Hours
Total resource used by the job = 5.133 + 3.6666 = 8.799 GB Hours
```

### 1.2.浪费的资源量

#### 1.2.1.计量统计

```To calculate the resources wasted, we calculate the following:
The minimum memory wasted by the tasks (Map and Reduce)
The runtime of the tasks (Map and Reduce)
The minimum memory wasted by a task is equal to the difference between the container size and maximum task memory(peak memory) among all tasks. The resources wasted by the task is then the minimum memory wasted by the task multiplied by the duration of the task. The total resource wasted by the job then will be equal to the sum of wasted resources of all the tasks.

Let us define the following for each task:

peak_memory_used := The upper bound on the memory used by the task.
runtime := The run time of the task.

The peak_memory_used for any task is calculated by finding out the maximum of physical memory(max_physical_memory) used by all the tasks and the virtual memory(virtual_memory) used by the task.
Since peak_memory_used for each task is upper bounded by max_physical_memory, we can say for each task:

peak_memory_used = Max(max_physical_memory, virtual_memory/2.1)
Where 2.1 is the cluster memory factor.

The minimum memory wasted by each task can then be calculated as:

wasted_memory = Container_size - peak_memory_used

The minimum resource wasted by each task can then be calculated as:

wasted_resource = wasted_memory * runtime
```

### 1.4.等待时间

#### 1.4.1.计量统计

```For each task, let us define the following:

ideal_start_time := The ideal time when all the tasks should have started
finish_time := The time when the task finished
task_runtime := The runtime of the task

For map tasks, we have

ideal_start_time := The job submission time

We will find the mapper task with the longest runtime ( task_runtime_max) and the task which finished last ( finish_time_last )
The total wait time of the job due to mapper tasks would be:

mapper_wait_time = finish_time_last - ( ideal_start_time + task_runtime_max)

For reducer tasks, we have

ideal_start_time := This is computed by looking at the reducer slow start percentage (mapreduce.job.reduce.slowstart.completedmaps) and finding the finish time of the map task after which first reducer should have started
We will find the reducer task with the longest runtime ( task_runtime_max) and the task which finished last ( finish_time_last )

The total wait time of the job due to reducer tasks would be:
reducer_wait_time = finish_time_last - ( ideal_start_time + task_runtime_max)
```

## 2.启发式算法

### 2.1.Map-Reduce

#### 2.1.1.Mapper 数据倾斜

Mapper 数据倾斜启发式算法能够显示作业是否发生数据倾斜。启发式算法会将所有 Mapper 分成两组，第一组的平均值会小于第二组。

##### 2.1.1.1.计算

```Let us define the following variables,

deviation: the deviation in input bytes between two groups
file_size: the average input size of the larger group

num_tasks_severity: List of severity thresholds for the number of tasks. e.g., num_tasks_severity = {10, 20, 50, 100}
deviation_severity: List of severity threshold values for the deviation of input bytes between two groups. e.g., deviation_severity: {2, 4, 8, 16}
files_severity: The severity threshold values for the fraction of HDFS block size. e.g. files_severity = { ⅛, ¼, ½, 1}

Let us define the following functions,

func avg(x): returns the average of a list x
func len(x): returns the length of a list x
func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

We’ll compute two groups recursively based on average memory consumed by them.

Let us call the two groups: group_1 and group_2

Without loss of generality, let us assume that,
avg(group_1) > avg(group_2) and len(group_1)< len(group_2) then,

deviation = avg(group_1) - avg(group_2) / min(avg(group_1)) - avg(group_2))
file_size = avg(group_1)

The overall severity of the heuristic can be computed as,
severity = min(
getSeverity(deviation, deviation_severity)
, getSeverity(file_size,files_severity)
)

---

avg(group_1) > ave(group_2) and len(group_1) < len(group_2)

deviation = avg(group_1) - avg(group_2) / min(avg(group_1) - avg(group_2))
file_size = avg(group_1)

```

#### 2.1.2.Mapper GC

Mapper GC 会分析任务的 GC 效率。它会计算出 GC 时间占所有 CPU 时间的百分比。

##### 2.1.2.1.计算

```Let us define the following variables:

avg_gc_time: average time spent garbage collecting
avg_cpu_time: average cpu time of all the tasks
avg_runtime: average runtime of all the tasks
gc_cpu_ratio: avg_gc_time/ avg_cpu_time

gc_ratio_severity: List of severity threshold values for the ratio of  avg_gc_time to avg_cpu_time.
runtime_severity: List of severity threshold values for the avg_runtime.

Let us define the following functions,

func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity of the heuristic can then be computed as,

severity = min(getSeverity(avg_runtime, runtime_severity), getSeverity(gc_cpu_ratio, gc_ratio_severity)

```

#### 2.1.3.Mapper 内存消耗

##### 2.1.3.1.计算
```Let us define the following variables,

avg_physical_memory: Average of the physical memories of all tasks.
container_memory: Container memory

container_memory_severity: List of threshold values for the average container memory of the tasks.
memory_ratio_severity: List of threshold values for the ratio of avg_plysical_memory to container_memory

Let us define the following functions,

func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity can then be computed as,

severity = min(getSeverity(avg_physical_memory/container_memory, memory_ratio_severity)
, getSeverity(container_memory,container_memory_severity)
)
```

#### 2.1.4.Mapper 的运行速度

##### 2.1.4.1.计算

```Let us define the following variables,

median_speed: median of speeds of all the mappers. The speeds of mappers are found by taking the ratio of input bytes to runtime.
median_size: median of size of all the mappers
median_runtime: median of runtime of all the mappers.

disk_speed_severity: List of threshold values for the median_speed.
runtime_severity: List of severity threshold values for median_runtime.

Let us define the following functions,

func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity of the heuristic can then be computed as,

severity = min(getSeverity(median_speed, disk_speed_severity), getSeverity(median_runtime, median_runtime_severity)

```

#### 2.1.5.Mapper 溢出

##### 2.1.5.1.计算
```Let us define the following parameters,

total_spills: The sum of spills from all the map tasks.
total_output_records: The sum of output records from all the map tasks.
ratio_spills: total_spills/ total_output_records

spill_severity: List of the threshold values for ratio_spills
num_tasks_severity: List of threshold values for total number of tasks.

Let us define the following functions,

func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity of the heuristic can then be computed as,

```

#### 2.1.6.Mapper 运行时间

• `Mapper`的运行时间很短。通常作业在以下情况下出现：
• `mapper`数量过多
• `mapper`的平均运行时间很短
• 文件太小
• 大文件或不可分割文件块，通常作业在以下情况下出现：
• `mapper`数量太少
• `mapper`的平均运行时间太长
• 文件过大 (个别达到 GB 级别)
##### 2.1.6.1.计算
```Let us define the following variables,
avg_size: average size of input data for all the mappers
avg_time: average of runtime of all the tasks.

short_runtime_severity: The list of threshold values for tasks with short runtime
long_runtime_severity: The list of threshold values for tasks with long runtime.
num_tasks_severity: The list of threshold values for number of tasks.

Let us define the following functions,
func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity of the heuristic can then be computed as,
severity = max(getSeverity(avg_size, long_runtime_severity), short_task_severity)
```

#### 2.1.7.Reducer 数据倾斜

##### 2.1.7.1.计算

```Let us define the following variables:
deviation: deviation in input bytes between two groups
file_size: average of larger group
num_tasks_severity: List of severity threshold values for the number of tasks.
e.g. num_tasks_severity = {10,20,50,100}
deviation_severity: List of severity threshold values for the deviation of input bytes between two groups.
e.g. deviation_severity = {2,4,8,16}
files_severity: The severity threshold values for the fraction of HDFS block size
e.g. files_severity = { ⅛, ¼, ½, 1}

Let us define the following functions:
func avg(x): returns the average of a list x
func len(x): returns the length of a list x
func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

We’ll compute two groups recursively based on average memory consumed by them.
Let us call the two groups: group_1 and group_2
Without loss of generality, let us assume that:
avg(group_1) > avg(group_2) and len(group_1)< len(group_2) then,
deviation = avg(group_1) - avg(group_2) / min(avg(group_1)) - avg(group_2))
file_size = avg(group_1)

The overall severity of the heuristic can be computed as,

```

#### 2.1.8.Reducer GC

##### 2.1.8.1.计算

```Let us define the following variables:

avg_gc_time: average time spent garbage collecting
avg_cpu_time: average cpu time of all the tasks
avg_runtime: average runtime of all the tasks
gc_cpu_ratio: avg_gc_time/ avg_cpu_time

gc_ratio_severity: List of severity threshold values for the ratio of  avg_gc_time to avg_cpu_time.
runtime_severity: List of severity threshold values for the avg_runtime.

Let us define the following functions,

func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity of the heuristic can then be computed as,
severity = min(getSeverity(avg_runtime, runtime_severity), getSeverity(gc_cpu_ratio, gc_ratio_severity)

```

#### 2.1.9.Reducer 内存消耗

##### 2.1.9.1.计算
```Let us define the following variables,

avg_physical_memory: Average of the physical memories of all tasks.
container_memory: Container memory

container_memory_severity: List of threshold values for the average container memory of the tasks.
memory_ratio_severity: List of threshold values for the ratio of avg_physical_memory to container_memory

Let us define the following functions,

func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity can then be computed as,

severity = min(getSeverity(avg_physical_memory/container_memory, memory_ratio_severity)
, getSeverity(container_memory,container_memory_severity)
)
```

#### 2.1.10.Reducer 运行时间

• `Reducer`过多，hadoop 任务可能的表现是：
• `Reducer`数量过多
• `Reducer`的运行时间很短
• `Reducer`过少，hadoop 任务可能的表现是：
• `Reducer`数量过少
• `Reducer`运行时间很长
##### 2.1.10.1.计算
```Let us define the following variables,

avg_size: average size of input data for all the mappers
avg_time: average of runtime of all the tasks.

short_runtime_severity: The list of threshold values for tasks with short runtime
long_runtime_severity: The list of threshold values for tasks with long runtime.

Let us define the following functions,

func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity of the heuristic can then be computed as,

severity = max(getSeverity(avg_size, long_runtime_severity), short_task_severity)
```

#### 2.1.11.清洗&排序

##### 2.1.11.1.计算
```Let’s define following variables,

avg_exec_time: average time spent in execution by all the tasks.
avg_shuffle_time: average time spent in shuffling.
avg_sort_time: average time spent in sorting.

runtime_ratio_severity: List of threshold values for the ratio of twice of average shuffle or sort time to average execution time.
runtime_severity: List of threshold values for the runtime for shuffle or sort stages.

The overall severity can then be found as,

severity = max(shuffle_severity, sort_severity)

where shuffle_severity and sort_severity can be found as:

shuffle_severity = min(getSeverity(avg_shuffle_time, runtime_severity), getSeverity(avg_shuffle_time*2/avg_exec_time, runtime_ratio_severity))

sort_severity = min(getSeverity(avg_sort_time, runtime_severity), getSeverity(avg_sort_time*2/avg_exec_time, runtime_ratio_severity))
```

### 2.2.Spark

#### 2.2.1.Spark 的事件日志限制

`Spark`事件日志处理器当前无法处理很大的日志文件。`Dr-Elephant`需要花很长的时间去处理一个很大的`Spark`时间日志文件，期间很可能会影响`Dr-Elephant`本身的稳定运行。因此，目前我们设置了一个日志大小限制（100MB），如果超过这个大小，会新起一个进程去处理这个日志。

#### 2.2.2.Spark 负载均衡处理器

`Map/Reduce`任务的执行机制不同，`Spark`应用在启动后会一次性分配它所需要的所有资源，直到整个任务结束才会释放这些资源。根据这个机制，对`Spark`的处理器的负载均衡就显得非常重要，可以避免集群中个别节点压力过大。

##### 2.2.2.1.计算
```Let us define the following variables:

peak_memory: List of peak memories for all executors
durations: List of durations of all executors
inputBytes: List of input bytes of all executors
outputBytes: List of output bytes of all executors.

looser_metric_deviation_severity: List of threshold values for deviation severity, loose bounds.
metric_deviation_severity: List of threshold values for deviation severity, tight bounds.

Let us define the following functions:

func getDeviation(x): returns max(|maximum-avg|, |minimum-avg|)/avg, where
x = list of values
maximum = maximum of values in x
minimum = minimum of values in x
avg = average of values in x

func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.
func max(x,y): returns the maximum value of x and y.
func Min(l): returns the minimum of a list l.

The overall severity can be found as,

severity = Min( getSeverity(getDeviation(peak_memory), looser_metric_deviation_severity),
getSeverity(getDeviation(durations),  metric_deviation_severity),
getSeverity(getDeviation(inputBytes), metric_deviation_severity),
getSeverity(getDeviation(outputBytes), looser_metric_deviation_severity).
)
```

#### 2.2.3.Spark 任务运行时间

##### 2.2.3.1.计算
```Let us define the following variables,

avg_job_failure_rate: Average job failure rate
avg_job_failure_rate_severity: List of threshold values for average job failure rate

Let us define the following variables for each job,

single_job_failure_rate: Failure rate of a single job
single_job_failure_rate_severity: List of threshold values for single job failure rate.

The severity of the job can be found as maximum of single_job_failure_rate_severity for all jobs and avg_job_failure_rate_severity.

i.e. severity = max(getSeverity(single_job_failure_rate, single_job_failure_rate_severity),
getSeverity(avg_job_failure_rate, avg_job_failure_rate_severity)
)

where single_job_failure_rate is computed for all the jobs.
```

#### 2.2.4.Spark 内存限制

##### 2.2.4.1.计算
```Let us define the following variables,

total_executor_memory: total memory of all the executors
total_storage_memory: total memory allocated for storage by all the executors
total_driver_memory: total driver memory allocated
peak_memory: total memory used at peak

mem_utilization_severity: The list of threshold values for the memory utilization.
total_memory_severity_in_tb: The list of threshold values for total memory.

Let us define the following functions,

func max(x,y): Returns maximum of x and y.
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity can then be computed as,

severity = max(getSeverity(total_executor_memory,total_memory_severity_in_tb),
getSeverity(peak_memory/total_storage_memory, mem_utilization_severity)
)
```

#### 2.2.5.Spark 阶段运行时间

`Spark`任务运行时间一样，`Spark`应用程序可以分为多个任务，每个任务又可以分为多个运行阶段。

##### 2.2.5.1.计算
```Let us define the following variable for each spark job,

stage_failure_rate: The stage failure rate of the job
stagge_failure_rate_severity: The list of threshold values for stage failure rate.

Let us define the following variables for each stage of a spark job,

task_failure_rate: The task failure rate of the stage
runtime: The runtime of a single stage

single_stage_tasks_failure_rate_severity: The list of threshold values for task failure of a stage
stage_runtime_severity_in_min: The list of threshold values for stage runtime.

Let us define the following functions,

func max(x,y): returns the maximum value of x and y.
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

The overall severity can be found as:

getSeverity(runtime, stage_runtime_severity_in_min)
)
severity_job = getSeverity(stage_failure_rate,stage_failure_rate_severity)

severity = max(severity_stage, severity_job)

where task_failure_rate is computed for all the tasks.
```

[1]

0 条评论

• ### 高并发系统支撑---限流算法

有些场景并不能用缓存和降级来解决，比如写服务、频繁的复杂查询，因此需有一种手段来限制这些场景的并发/请求量，即限流。

• ### kafka集群扩容后的数据均衡

生产环境的kafka集群扩容，是一个比较常见的需求和操作。然而kafka在新增节点后并不会像elasticsearch那样感知到新节点加入后，自动将数据rebl...

• ### 【redis从入门到上线(1)】- 初识redis及部署

Redis是一个开源（BSD许可），内存存储的数据结构服务器，可用作数据库，高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合，位图，hyper...

• ### Why we need activation function？

Deep networks with many many layers, many many hidden layers and turns out that ...

• ### 机器学习是什么

机器学习是什么？众说纷纭，各抒己见。 ? 让我们看一些机器学习是什么？的经典见解。 见解一：Arthur Samuel 先生定义机器学习 “the field ...

• ### 学界 | 万字长文详解腾讯优图 CVPR 2019 入选论文

AI 科技评论消息，CVPR 2019 即将于 6 月在美国长滩召开。今年有超过 5165 篇的大会论文投稿，最终录取 1299 篇。此次，腾讯公司有超过 58...

• ### 二分法,有名函数,匿名函数,内置函数

二分法个人理解是一种算法的方式在一个有序的序列,区中间值,把中间值与想要的值进行对比从而一次一次的减少搜索范围举例

• ### 数据湖火了，那数据仓库怎么办？

这是《未来简史》中提出的三个革命性观点。一本书短短百页，让我们看到了世界颠覆性的变化，从计算机，到互联网，再到大数据、人工智能，所有的变化都在以一种肉眼可观却又...

• ### 个人信息的控制顺序和感知控制（CS HC）

以个人信息披露为重点，我们运用控制理论和控制顺序的概念来研究人们对信息披露的含义以及他们同意披露的倾向的理解。我们分析了相关文献，并进行了初步的在线研究（N =...