Spark Streaming Dynamic Resource Allocation

Problem Statement

DRA has already been implemented since Spark 1.2 . However the existing Spark DRA on Yarn implementation does not embody the specific property of Spark Streaming.

Spark DRA works when there are some executors being idle for removeExecutorInterval time,then they will be removed or when there is a backlog of pending tasks waiting to be rescheduled,then new executors will be added。This mechanism works fine for Long-Time stage。 Spark Streaming is known as micro batch processing,hence most of time the duration of batch is small. It will cause churn when removing executors at the end of the batch and adding executors at the beginning of next batch.

In real-time data processing, data slowly increase or slowly down,so the adjacent batches have almost the same data to process.

The perfect result of spark streaming is processing time equal to duration. So the key concept is reducing/increasing resource until processing time infinitely close to duration .

Goals

The goal is to make processing time infinitely close to duration by reducing/increasing resource in spark streaming . And we also hope having a reasonable stable time after short-time adjustment which means resource adjustment rarely happens

Design Summary

The same as spark DRA, Spark Streaming DRA is also disabled,but can be enabled by a new Spark conf property spark.streaming.dynamicAllocation.enabled. All related Spark Streaming DRA properties introduced will bepackaged in the spark.streaming.dynamicAllocation.* namespace

Adding Executors

The job in Spark Streaming is time-sensitive . Once there is a batch delayed it triggers Adding-Executors action. This Action will request the number of spark.streaming.dynamicAllocation.maxExecutors executors from Yarn immediately in greedy way since we hope we can eliminate the delay as soon as possible ,at the same time,we also should take care of the situation when requesting resource with multi rounds may have resource scarcity in shared cluster.

Removing Executors

As mentioned before, Spark Streaming, time-sensitive , so the action removing executors is not allowed to consume too much time of duration. So executors marked as removed will be added to pendingToRemove set so new tasks will not be launched in them ,then we acknowledge yarn to kill them asynchronously.

Algorithm of DRA

To calculate the number of executor should be removed every round,the formula is used as following

val totalRemoveExecutorNum = Math.round(
currentExecutors * (
(duration.toDouble - processDuration) / duration - reserveRate)
)
val actualShouldRemoveExecutorNumInThisRound = totalRemoveExecutorNum / releaseRounds 

In this formula, we suppose processing time has a strong relationship with executor number,however,they are not linear relationship. To fix this, we add some new parameters like reserveRate and releaseRounds to make sure we fit this non-linear relationship .

We also provide a heuristic strategy to reduce executor number. This formula will be calculate in every round, so every round the number will be readjusted circularly ,and finally actualShouldRemoveExecutorNumInThisRound will converge to zero .

Implementation of DRA

New classes should be added in spark streaming module:

package org.apache.spark.streaming
private[spark] class StreamingExecutorAllocationManager(client: ExecutorAllocationClient,
                                                        duration: Long,
                                                        steamingListenerBus: StreamingListenerBus,
                                                        listenerBus: LiveListenerBus,
                                                        conf: SparkConf) extends Logging {
  allocationManager =>
......
}

private class StreamingExecutorAllocationListener extends SparkListener {
......
}

private class StreamingSchedulerListener extends StreamingListener {
.....
}

StreamingExecutorAllocationManager is initialed in StreamingContext

Class affected in spark core module:

package org.apache.spark
private[spark] trait ExecutorAllocationClient {
   //blackList who can remove executors immediately make Yarn remove them asynchronously possible
  def addExecutorToPendingStatus(executorId: String): Unit = {}
  
  //cause spark streaming is initial late then spark core module,
  // we need to know how many executors we already have when 
  // spark streaming is initaled
  def executors(): List[String] = { List() }
}

SparkContext and CoarseGrainedSchedulerBackend both are affected because they are extend from ExecutorAllocationClient.

JobScheduler is also should be modified so before batch job submitted we can adjust resource.

package org.apache.spark.streaming.scheduler
private[streaming]class JobScheduler(val ssc: StreamingContext) extends Logging {

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)

      //before job submitted,we should compute resource actually required
      ssc.executorAllocationManager match {
        case Some(eam)=>  eam.run()
        case None => //do nothing
      }
   .........
    }
  }
......}

DRA Available Properties

Enable DRA:

spark.streaming.dynamicAllocation.enabled=true

Upper/Lower bound for the number of executors if dynamic allocation is enabled.

spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50

More message will be printed in log if DRA is enabled. Default is false

spark.streaming.dynamicAllocation.debug=true

Rounds(Batches) will be used to release the resource calculated in current round. Default is 5

spark.streaming.dynamicAllocation.releaseRounds=5

The number of rounds(Batches) should be remembered. We can increase this number if batch processing time is unstable . this number affects processDuration in (duration.toDouble - processDuration) .Default is 1

spark.streaming.dynamicAllocation.rememberBatchSize=1

DRA delays to work when the specific number of rounds has been submitted. Default is 10

spark.streaming.dynamicAllocation.delay.rounds=10

Make sure the resource is more than reserveRate * current number of Executors. More useful than minExecutorNumber. Default is 0.2

spark.streaming.dynamicAllocation.reserveRate=0.2

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Ken的杂谈

【系统设置】CentOS 修改机器名

18130
来自专栏儿童编程

儿童创造力教育与编程教育的碰撞——MIT雷斯尼克教授最新理论梗概

儿童编程教育已经在我国各一线二线城市疯狂出现,颇有“烂大街”的趋势。我们不禁要问很多很多问题:

22370
来自专栏儿童编程

声音功能让儿童编程更有创造性

导读:Scratch中声音功能非常强大,除了常规的音效,你甚至可以模拟各种乐器的各个发音、设置节拍、休止……如果你愿意,甚至可以用它创作一个交响乐。我们可以引导...

13840
来自专栏儿童编程

《动物魔法学校》儿童学编程Scratch之“外观”部分

导读:本文通过一个案例《动物魔法学校》来学习Scratch语言的“外观”部分。之后通过一系列其他功能的综合运用对作品功能进行了扩展。

19240
来自专栏儿童编程

天干地支五行八卦的对应关系

19690
来自专栏儿童编程

我不是算命先生,却对占卜有了疑惑——如何论证“占卜前提”的正确与否

事出有因,我对《周易》感兴趣了很多年。只是觉得特别有趣,断断续续学习了一些皮毛。这几天又偶然接触到了《梅花易数》,觉得很是精彩,将五行八卦天干地支都串联了起来。...

15310
来自专栏儿童编程

一张图理清《梅花易数》梗概

学《易经》的目的不一定是为了卜卦,但是了解卜卦绝对能够让你更好地了解易学。今天用一张思维导图对《梅花易数》的主要内容进行概括,希望能够给学友们提供帮助。

32240
来自专栏haifeiWu与他朋友们的专栏

复杂业务下向Mysql导入30万条数据代码优化的踩坑记录

从毕业到现在第一次接触到超过30万条数据导入MySQL的场景(有点low),就是在顺丰公司接入我司EMM产品时需要将AD中的员工数据导入MySQL中,因此楼主负...

29840
来自专栏FSociety

SQL中GROUP BY用法示例

GROUP BY我们可以先从字面上来理解,GROUP表示分组,BY后面写字段名,就表示根据哪个字段进行分组,如果有用Excel比较多的话,GROUP BY比较类...

5.2K20
来自专栏儿童编程

什么样的人生才是有意义的人生——没有标准的标准答案

【导读】其实我们可以跳出这个小圈圈去更加科客观地看一下这个世界。在夜晚的时候我们仰望天空,浩瀚的宇宙中整个地球只是一粒浮尘,何况地球上一个小小的人类?在漫长的历...

1.8K50

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励