Spark调优

因为Spark是内存当中的计算框架,集群中的任何资源都会让它处于瓶颈,CPU、内存、网络带宽。通常,内存足够的情况之下,网络带宽是瓶颈,这时我们就需要进行一些调优,比如用一种序列化的方式来存储RDD来减少内存使用,这边文章就讲两种方式,数据序列化和内存调优,接下来我们会分几个主题来谈论这个调优问题。

1、数据序列化

(1) Spark默认是使用Java的ObjectOutputStream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io.Externalizable。这种格式比较大,而且速度慢。

(2)Spark还支持这种方式Kryo serialization,它的速度快,而且压缩比高于Java的序列化,但是它不支持所有的Serializable格式,并且需要在程序里面注册。它需要在实例化SparkContext之前进行注册,下面是它的使用例子:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyClass1])
    kryo.register(classOf[MyClass2])
  }
}

// Make sure to set these properties *before* creating a SparkContext!
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(...)

如果对象很大,需要设置这个参数spark.kryoserializer.buffer.mb,默认是2。

想了解更多关于这个格式的,可以查看这个网址https://github.com/EsotericSoftware/kryo

2、内存调化

这里面需要考虑3点,对象使用的内存、访问这些对象的开销、垃圾回收器的管理开销。

通常,对象访问的速度都很快,但是需要2-5x的空间来存储,因为下面的原因:

1)每一个独立的Java对象,都有一个16字节的“object header”和关于这个对象的信息,比如指针。

2)Java String类型有40字节的“object header”,然后因为Unicode,每个字符要存储2个字节,这样10个字符要消耗掉大概60个字节。

3)普通的容器类,比如HashMap和LinkedList,它们采用的是链式的数据结构,它需要封装每个实体,不仅需要头信息,还要有个指针指向下一个实体。

4)原始容器类型通常存储它们为装箱类型,比如java.lang.Integer。

下面我们就来讨论如何确定这些对象的内存开销并且如何进行调优,比如改变数据结构或者序列化存储数据。下面我们讲谈论如何调优Spark的Cache大小以及Java的垃圾回收器。

(1)确定内存使用情况

首先我们要确定内存使用情况,确定数据集的内存使用情况,最好的方法就是创建一个RDD,然后缓存它,然后查看日志,日志会记录出来它的每个分片使用的大小,然后我们可以找个这些分片的大小计算出总大小,如下:

INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)

(2)数据结构调优

  1) 优先使用数组和原生类型来替代容器类,或者使用fastutil找个包提供的容器类型,fastutil的官方链接是http://fastutil.di.unimi.it/。

  2)避免大量的小对象的嵌套结构。

  3)使用数字的ID来表示,而不是使用字符串的ID。

  4)如果内存小于32GB,设置JVM参数 -XX:+UseCompressedOops为4个字节而不是8个字节;在Java7或者之后的,尝试使用-XX:+UseCompressedStrings存储ASCII字符串8个比特一个字符。这些参数可以添加到spark-env.sh,根据我的观察,应该是设置到SPARK_JAVA_OPTS这个参数上。

(3)序列化RDD存储

 强烈建议使用Kryo进行序列化,这也是降低内存使用最简单的方式。

(4)垃圾回收器调优

 当我们只使用一次RDD的时候,不会存在这方面的问题。当java需要清除旧的对象给新的对象腾出空间的时候,它需要遍历所有对象,然后找出那些没有使用的。这里最中要的一点是记住,垃圾回收器的代价是和它里面的对象的数量相关的。查看GC是不是一个问题,第一件事就是使用序列化的缓存方式。

  GC还可以出现的问题就是执行任务所需要的内存大小,下面我们讲讨论如何控制分配给RDD缓存的空间大小来减轻这个问题。

  1)确定GC的影响

  添加这些参数到-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps到SPARK_JAVA_OPTS这个参数,让它出书GC的信息,然后运行任务。

  2)缓存大小调优

 影响GC的一个重要配置参数是分配给缓存RDD的内存大小,Spark默认是使用 66%的可配置内存大小(通过spark.executor.memory or SPARK_MEM来配置)来存储RDD,也即是说,只有33%是给任务执行过程当中执行过程当中创建的对象的。

 当你的程序慢下来,你发现GC很频繁或内存不够等现象,降低它的值会起到一些效果,我们可以通过这个参数System.setProperty("spark.storage.memoryFraction", "0.5")来达到这个效果。

 3)高级内存调优

  java的堆内存是分为两个区间,Young和Old,Young是用来存储短生命周期的对象,Old是用来存储长生命周期的对象。Young又可以进一步细分为 [Eden, Survivor1, Survivor2]。 一个简单的垃圾过程可以描述为:当Eden满的时候,一个简单的GC会运行在Eden和依赖它的对象,Survivor1被复制到Survivor2。Survivor区域进行了交换。如果一个对象足够老或者Survivor2满了,它就会被移到Old区。当Old区也满的时候,一个完整的GC就会触发。

Spark里面的GC调优目标是确保RDD存储在Old区间,并且Young区有足够的空间去存储那些短生命周期的对象。这样可以减少完全的GC去回收那些任务执行中的临时对象。 下面的的这些步骤可能是有用的:

  1)检查GC的统计信息,查看在任务执行完成之前是不是执行过多次的GC,这意味着内存不足以执行任务。

      2)当Old区快满的时候,我们可以通过调整这个参数spark.storage.memoryFraction来减少缓存使用的内存量,少缓存一点对象比拖慢作业执行更好一些。

      3)当发生了很多次小的GC,而不是重要的GC时候,我们可以考虑多分配点内存给Eden,假设一个任务需要使用E大小的内存,我们可以分配给Eden的内存大小为: -Xmn=4/3*E,这个大小同样适用于survivor区间。

  4)当从HDFS上读取数据的时候,任务的所需内容可以估计为block的大小,一个反压缩的快是2-3倍的大小,我们考虑用3-4个任务来执行,这样我们可以考虑设置Eden的大小为4*3*64MB。

3、其它的考虑

(1)并行的水平

  建议是1个CPU核心2-3个任务,可以通过程序的函数的时候传入numPartitions参数,或者通过系统变量spark.default.parallelism来改变。

(2)Reduce任务的内存使用情况

  有时候出现OutOfMemoryError并不是因为RDD太大内存装不下,而是因为执行Reduce任务执行的groupByKey的结果太大。Spark的shuffle操作(sortByKeygroupByKeyreduceByKeyjoin, etc)它会为每一个任务建立一个hash表来执行grouping操作,简答的处理方式就是增加并行水平,这样每个任务的输入集变小。Spark能够支持每个任务200ms的速度,因为它在所有任务共享了JVMs,减小了发布任务的开销,所有可以安全的增加并行水平超过核心数。

 (3)使用broadcast存储大的变量

 使用Spark里面的broadcast的变量来存储大的变量可以大大减少每个序列化任务的大小和集群发布任务的开销。任务大对象的任务都可以考虑使用broadcast变量,Spark在master上会打印每个序列化任务的大小,当大小超过20KB的时候,可以考虑调优。

4、总结

  这里简短的指出了我们调优的时候需要注意的一些重要的点,通常我们把序列化方式调整为Kryo并且缓存方式改为序列化存储方式就可以解决大部分的问题了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Python、Flask、Django

使用字典映射实现python的switch case 这个问题很久以前遇到过,记录过,但是记录的不全

1473
来自专栏CSDN技术头条

Spark Block存储管理分析

Apache Spark中,对Block的查询、存储管理,是通过唯一的Block ID来进行区分的。所以,了解Block ID的生成规则,能够帮助我们了解Blo...

23810
来自专栏FreeBuf

VMware更新 | 修复Apache Flex BlazeDS中的漏洞

VMware发布了数个产品的版本更新,目的是修复Apache Flex BlazeDS中的一个漏洞。 据VMware介绍,Flex BlazeDS组件应用在数个...

2235
来自专栏肖力涛的专栏

Spark踩坑记:共享变量

如果我们想在节点之间共享一份变量,比如一份公共的配置项,该怎么办呢?Spark为我们提供了两种特定的共享变量,来完成节点间变量的共享。 本文首先简单的介绍spa...

1.1K1
来自专栏java一日一条

JavaScript 内存泄露的4种方式及如何避免

本文将探索常见的客户端 JavaScript 内存泄露,以及如何使用 Chrome 开发工具发现问题。

1273
来自专栏LhWorld哥陪你聊算法

Hive篇---Hive使用优化

本节主要描述Hive的优化使用,Hive的优化着重强调一个 把Hive SQL 当做Mapreduce程序去优化 二.主要优化点

8011
来自专栏牛肉圆粉不加葱

揭开Spark Streaming神秘面纱③ - 动态生成 job

JobScheduler有两个重要成员,一是上文介绍的 ReceiverTracker,负责分发 receivers 及源源不断地接收数据;二是本文将要介绍的 ...

903
来自专栏Spark学习技巧

spark调优系列之内存和GC调优

本文基于spark1.6讲解。 一,基本概述 调优内存的使用主要有三个方面的考虑:对象的内存占用量(你可能希望整个数据集都适合内存),访问这些数据的开销,垃圾...

9209
来自专栏CDA数据分析师

专栏 | Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spa...

2249
来自专栏微服务生态

从0到1起步-跟我进入堆外内存的奇妙世界

堆外内存一直是Java业务开发人员难以企及的隐藏领域,究竟他是干什么的,以及如何更好的使用呢?那就请跟着我进入这个世界吧。

962

扫码关注云+社区

领取腾讯云代金券