首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkConf 配置与传播

SparkConf 配置与传播

作者头像
Tim在路上
发布2022-03-23 14:19:38
1940
发布2022-03-23 14:19:38
举报

在spark分布式程序中,sparkConf 主要起着Spark程序进行资源配置,性能调优,功能开关,参数传递的能力。在Spark的Driver和Executor中都存在着SparkConf。

在大多数的时候,我们可以通过new SparkConf() 来创建Spark配置。传统的Spark的入口是SparkContext的创建就是可以通过创建SparkConf, 并传入SparkContext()。

val conf = new SparkConf().setAppName("app").setMaster("master")
val sparkContext = new SparkContext(conf)

首先,我们总结下Spark的配置通过以下3种方式获取:

  1. 来源于系统参数 (System.getProperties)中以 spark. 作为前缀的属性;
  2. 使用SparkConf 的Api进行设置;
  3. 从其他的SparkConf 中进行克隆继承。
private val settings = new ConcurrentHashMap[String, String]()

从源码中可以看出,Spark中的所有配置,其key,value都是String类型,同时其都是存储在ConcurrentHashMap中的。

1. 来源于系统参数

从源码中可以看出,默认是会加载来自于系统的参数(JVM参数和系统虚拟机的参数),这些参数需要是spark. 开头的系统参数,会默认加载到sparkConf中。

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

  import SparkConf._

/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

  private val settings= new ConcurrentHashMap[String, String]()

  ...
  // 如果loadDefaults为false则不会加载系统参数,可以方便测试。
  if (loadDefaults) {
    loadFromSystemProperties(false)
  }

  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    // Load any spark.* system properties
    for ((key, value) <- Utils.getSystemPropertiesif key.startsWith("spark.")) {
      set(key, value, silent)
    }
    this
  }
...

2. 使用SparkConf API进行设置

通过set api 进行设置参数, 从源码可以看出, 默认对废弃的参数进行打印警告。默认会将配置放置到ConcurrentHashMap中。

/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
  set(key, value, false)
}

private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
  if (key == null) {
    throw new NullPointerException("null key")
  }
  if (value == null) {
    throw new NullPointerException("null value for " + key)
  }
  // 对废弃的参数进行打印警告
  if (!silent) {
    logDeprecationWarning(key)
  }
    settings.put(key, value)
  this
}

另外,在创建SparkSession时,必须指定的appName和master。

val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark schedule mode")
      .getOrCreate()

从源码可以看出,其实就是设置配置的spark.master[spark.app.name](http://spark.app.name) 的键值对。

*/
def setMaster(master: String): SparkConf = {
  set("spark.master", master)
}

/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
  set("spark.app.name", name)
}

另外,如果在设置spark配置以spark.hadoop开头,则会将后面的参数拷贝到hadoop的配置中。

private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
  // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
  for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
    hadoopConf.set(key.substring("spark.hadoop.".length), value)
  }
}

给SparkExecutor传参可以通过调用setExecutorEnv方法:

def setExecutorEnv(variable: String, value: String): SparkConf = {
  set("spark.executorEnv." + variable, value)
}

3. 从其他系统克隆继承

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

  import SparkConf._
...

/** Copy this object */
  override def clone: SparkConf = {
    val cloned = new SparkConf(false)
    settings.entrySet().asScala.foreach { e =>
      cloned.set(e.getKey(), e.getValue(), true)
    }
    cloned
  }

从SparkConf的定义可以看出,其定义之初就实现了cloneable, 可以方便配置的拷贝与传递

/**
 * Return a copy of this SparkContext's configuration. The configuration''cannot''be
 * changed at runtime.
 */
def getConf: SparkConf = conf.clone()

可以看出在SparkContext中获取SparkConf的方式就是采用的conf.clone() 的方式。

总结:SparkConf 是定制和调配Spark程序的灵魂,在spark中它被多个组件共用。在SparkConf中的键值对配置都被保存在ConcurrentHashMap中,它可以保证在高并发下保证一定的性能。SparkConf继承了Cloneable特质的clone方式,并通过其进行复用和传播。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022.02.26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 来源于系统参数
  • 2. 使用SparkConf API进行设置
  • 3. 从其他系统克隆继承
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档