前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 性能优化指南(官网文档)

Spark 性能优化指南(官网文档)

作者头像
大数据学习指南
发布2022-05-26 08:33:42
7160
发布2022-05-26 08:33:42
举报
文章被收录于专栏:同名公众号:大数据学习指南

本篇文章翻译之 Tuning Spark。

由于大多数Spark组件基于内存的特性,Spark程序可能会因为集群中的任何资源而导致出现瓶颈:CPU、网络带宽或内存。通常情况下,如果数据适合于放到内存中,那么瓶颈就是网络带宽,但有时,我们还是需要内存进行一些调优的,比如以序列化的形式保存RDDs,以便减少内存占用。

这篇调优指南主要涵盖两个主题:数据序列化和内存调优。数据序列化不仅可以优化网络性能,而且还可以减少内存的使用。

1、数据序列化 - Data Serialization

序列化在任何的分布式应用中都扮演着重要的角色。但是,如果将对象序列化成比较慢的格式,或者耗费大量字节的格式,都会大大降低计算速度。Spark在便利性(允许你使用任何Java类型)和性能之间取得平衡。它提供了两个序列化库:

  • Java serialization:默认情况下,Spark使用Java的ObjectOutputStream框架来序列化对象,而且可以使用任何你通过实现java.io.Serializable来创建的类。你还可以通过继承java.io.Externalizable来控制序列化的性能。Java序列化是灵活的,但通常很慢,而且对于很多类会导致大的序列化格式。
  • Kryo serialization:Spark也可以使用Kryo库(version 4)来更快的序列化对象。Kryo明显要比Java序列化更快,更紧凑,但不支持所有序列化类型,并且要求你提前注册你将在程序中使用的类,以获得最佳性能。

如何使用呢?

我们可以通过使用 SparkConf 初始化 job,并调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 来使用 Kryo 序列化。这个设置配置的序列化器不仅可以用于 worker 节点之间的 shuffle 数据,还可以用于序列化到磁盘的 RDDs。Kryo 不是默认值的唯一原因是因为其要自定义注册,但是官方建议在任何大型网络密集计算应用中应该尝试使用它。

从 Spark2.0.0 开始,我们在基于基本数据类型、基本数据类型或字符串类型的数组来 shuffle RDDs 时,使用Kyro序列化器。

Spark 对于包含在 AllScalaRegistrar(Twitter chill library) 中的常用核心Scala类,都自动包含了Kryo序列化器。

使用 registerKryoClasses 方法,向 Kryo 注册您自己的自定义类。下面是示例:

代码语言:javascript
复制
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo 文档描述了更高级的注册选项,比如添加自定义的序列化代码。

如果对象很大,我们可能需要增加配置(spark.kryoserializer.buffer)的值。这个值要足够大到能容纳你要序列化的最大的对象。

最后,如果我们没有注册自定义类,Kryo 将仍然生效,但是它将不得不存储每个对象的完整类名,那将会非常浪费。

2、内存调优 - Memory Tuning

调优内存时需要考虑三个因素:对象占用的内存数(可能想要将整个dataset放到内存中),访问这些对象的成本以及垃圾收集的开销。

默认情况下,Java 对象访问速度很快,但是,所消耗的存储空间要比实际的对象多消消耗 2~5 倍的空间。这是为什么呢?有以下几个原因:

  • 不同的Java对象都有一个"对象头",大约是16个字节,并包含指向其类的指针等信息。对于一个只有很少数据的对象(比如一个Int字段),对象头可能会比数据更大。
  • Java 字符串在其原始数据上大约有40个字节的开销(因为它们是将原始数据保存在字符数组中的,并且保存长度等额外的数据),由于字符串内部使用UTF-16编码,所以每个字符都存储为两个字节。因此,一个10字符的字符串可以很容易的消耗60个字节。
  • 通用集合类,如HashMap和LinkedList,使用链式数据结构,其中每个条目(例如Map.Entry)都有一个"wrapper"对象。这个对象不仅有对象头,还有指向列表中下一个对象的指针(通常每个指针8个字节)。
  • 基本数据类型的集合通常将它们存储为装箱对象,如java.lang.Integer。

下面将首先概述 Spark 的内存管理,然后讨论用户可以采取的具体策略,以便更有效地使用应用程序中的内存。我们将描述如何确定对象的内存使用,以及如何改进内存使用——通过改变数据结构,或以序列化格式存储数据。然后,我们将概括调优Spark的缓存大小和Java垃圾收集器。

2.1 内存管理 - Memory Management Overview

Spark 中的内存使用主要分为两类:execution 和 storage。Execution memory 指的是,在 shuffle、join、sort和 aggregation 中用于计算的内存,而storage memory 指的是用来在集群中缓存和传输内部数据的内存。Spark中,execution 和storage 共享一个统一的区域(M)。当没有execution memory被使用时,storage可以获取所有可用内存,反之,如果没有storage memory被使用时,execution也可以获取所有可用的内存。如果在Execution storage不够用时,可驱逐storage区域占用Execution区域的一部分内存,但仅在总的storage memory占用低于某个阈值®之前才会这么做。换句话说,R是M中的一个子区域,是在默认情况下分配给storage的内存,阈值R内缓存的块是永远不会被驱逐的。

这种设计确保了几个重要的特性。首先,不使用缓存的应用程序可以拿整个内存空间给execution用,从而避免不必要的磁盘溢出。其次,如果应用程序确实要使用缓存,可以保留一个最小的storage空间®,这里的数据块不会被驱逐。

虽然有两个相关的配置,但由于默认值已适用于大多数情况,一般用户是不需要调整这两个参数的:

  • spark.memory.fraction 代表统一共享区域M占Java堆内存-300MB的比例(默认是0.6)。剩余40%的空间是留给用户数据结构、Spark内部元数据和防止OMM用的。
  • spark.memory.storageFraction 代表R区域占M区域的比例(默认是0.5)。R中的缓存块时不会被Execution驱逐的。

spark.memory.fraction 的值应满足JVM老年代的堆空间大小。有关详细信息,请参考下面关于高级GC调优的讨论。

2.2 确定内存占用 - Determining Memory Consumption

衡量一个 dataset 所需内存的最好的方法就是创建一个 RDD,将其放入缓存中,然后到web UI中查看"Storage"页面。这个页面会告诉你,这个RDD占用了多少内存。要估计一个特定对象的内存占用,可以使用SizeEstimator的estimate方法,这对于尝试用不同的数据设计来调整内存使用是非常有用的,还可以确定广播变量在每个 executor 上占的堆大小。

2.3 数据结构调优 - Tuning Data Structures

减少内存消耗的第一种方法是,避免那些会增加开销的Java特性,比如基于指针的数据结构和包装对象。有几种方式可以做到这一点:

  1. 设计你的数据结构以优先选择对象数组和基本类型,而不是标准的Java或Scala集合类型(比如HashMap)。fastutil库为与Java标准库兼容的基本类型提供了方便的集合类。
  2. 尽可能避免使用包含大量小对象和指针的嵌套结构。
  3. 对于主键字段,考虑使用数字类型的ID或枚举对象来代替字符串。
  4. 如果内存少于32GB,可以设置JVM参数-XX:+UseCompressedOops,来使指针由8个字节变为4个字节。您可以在spark-env.sh中添加这个选项。

2.4 序列化RDD存储 - Serialized RDD Storage

当进行了调优之后,对象太大还是无法有效地存储时,一个更简单的减少内存占用的方式就是使用RDD持久化API中的序列化存储级别(比如MEMORY_ONLY_SER)以序列化形式存储对象。Spark将每个RDD分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点就是访问时间慢,由于必须动态地反序列化对个对象。我们强烈建议使用Kryo,如果您想以序列化的形式缓存数据,因为它比Java序列化占用小的多的空间。

2.5 垃圾收集调优 - Garbage Cllection Tuning

当我们的应用程序存储了大量的RDD时,JVM垃圾收集可能会成为问题。

当Java需要驱逐旧对象来为新对象腾出空间时,它将跟踪所有Java对象,并找到未使用的对象。这里要记住的要点是,垃圾收集的成本与Java对象的数据成正比,使用更小对象的数据结构(比如,用int类型的数组代替LinkedList)可以大大降低垃圾收集的成本。

一个更好的方法是以序列化的形式持久化对象,如上所述:现在每个RDD分区只有一个对象(一个字节数组)。如果存在GC问题,在尝试使用其他技术之前,首先要尝试使用序列化缓存。

由于任务工作内存(运行task所需的内存空间)和缓存在节点上的RDD之间存在冲突,也可能会导致GC问题。我们将讨论如何控制分配给RDD的缓存空间来缓解这种问题。

2.5.1 衡量GC影响 - Measuring the Impact of GC

GC调优的第一步是收集统计垃圾收集的频率和GC所耗费的时间。这可以通过添加Java gc选项-XX:+PrintGCDetails和-XX:+PrintGCTimeStamps来实现。(有关给Spark job传递Java选项的信息,请查看configuration guide)。在下次Spark job运行时,您将在发生垃圾收集时看到被打印到work检点上的日志信息。注意,这些GC日志是打印在集群的worker节点而不是driver节点。

2.5.2 高级GC调优策略 - Advanced GC Tuning

为了更进一步地调优垃圾收集,我们首先需要了解一些关于JVM内存管理的基本信息:

  • Java堆空间被划分为年轻代和年老代两个区域。年轻代用来保存存活时间短的对象,而年老代保存寿命更长的对象。
  • 年轻代被进一步划分成Eden,Survivor1和Survivor2三个区域。
  • 垃圾收集过程的简单描述:当Eden空间已满时,会在Eden空间触发一次minor GC,然后将Eden和Survivor1中仍然存活的对象复制到Survivor2区域。如果一个对象达到了所设定的最大年龄或者Survivor2区满了,就会将对象移动到年老代。最终,当年老代空间快要满了时,将会触发一次full GC。

Spark中进行GC调优的目标是确保只有存活时间长的RDD存储在年老代,年轻代足以存储存活时间短的对象。这将有助于避免full GC去收集任务执行期间创建的临时对象。下面是一些有用的GC调优方法:

  • 通过收集GC统计信息来检查是否有太多的垃圾收集发生。如果在一个task执行完成之前,触发了多次full GC,这意味着没有足够的内存可用来执行tasks。
  • 如果触发了太多的minor GC,而没有太多major GC,那么为Eden区分配更多内存将会有所帮助。您可以将Eden区的大小设置为高于每个task预估所占用的内存。如果Eden区的大小被确定为E,那么可以使用选项-Xmn=4/3*E来这是年轻代的大小。
  • 在打印的GC统计信息中,如果发现年老代将要满了,则通过降低spark.memory.fraction来减少用于缓存的内存占用;缓存更少的对象比降低task的执行速度要更好。或者,考虑减少年轻代的大小。如果你已经设置了-Xmn的值,这意味着降低它的大小。如果没有设置-Xmn的值,尝试盖面JVM的NewRatio参数的值,许多JVM将这个参数的默认值设为2,这表明年老代占整个堆空间的2/3,它应该足够大,以超过spark.memory.fraction的值。
  • 尝试使用G1GC垃圾收集器-XX:+UseG1GC。它可以在垃圾收集成为瓶颈的情况下提高性能。注意,对于那些堆内存大的executor来说,增加G1 的region size(-XX:G1HeapRegionSize)可能很重要。
  • 举个例子,如果您的task是从HDFS读取数据,那么就可以使用从HDFS读取数据的block大小来估计这个task所使用的内存。需要注意的是,block解压缩之后的大小通常是原来的2或3倍。因此,如果我们希望有3或4个task的工作空间,并且HDFS block大小为128MB,我们就可以估算Eden区大小为43128。
  • 监视垃圾收集的频率和时间如何随着设置的变化而变化。

我们的经验表明,GC调优的效果取决于你的应用程序和可用内存的大小。网上有许多调优选项,但是管理full GC发生的频率有助于减少开销。

3、其他优化技巧 - Other Considerations

3.1 任务并行度 - Level of Parallelism

除非为每个操作设置足够高的并行度,否则集群资源不会得到充分利用。Spark根据每个文件的大小自动设置要在每个文件上运行的map task的数量。对于分布式的reduce操作,例如groupByKey和reduceByKey,它使用最大的父RDD的分区数。你可以将并行度作为第二个参数传递,或设置属性spark.default.parallelism来更改默认值。通常,我们建议集群中每个CPU xore执行2-3个task。

3.2 reduce端task内存占用 - Memory Usage of Reduce Tasks

有时候,我们的应用程序发生OOM错误并不是因为RDD无法放入内存中,而是因为其中一个task的工作集太大,例如groupByKey中的一个reduce task数据太多。Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等)在每个task中构建了一个hash table来执行聚合分组,这通常会包含大量的数据。缓解这种情况最简单的方法就是增加并行度,这样每个task的处理的数据就会变少。Spark可以有效地支持短至200ms的task,因为它可以对许多tasks重用一个executor JVM,而且启动task成本很低,因此你可以安全将并行度增加到集群core数量以上。

3.3 广播大变量 - Broadcasting Large Variables

使用SparkContext中的广播功能可以极大地减少每个序列化task的大小和集群启动job的成本。如果你的task使用了driver端任何的大对象,可以考虑将这些对象转换为广播变量。Spark在master节点打印每个task的序列化大小,因此您可以查看来确定task是否太大,一般来说,大于20KB的task值得去优化。

3.4 数据本地性 - Data Locality

数据所在的位置对Spark作业的性能有很大的影响。如果数据和要处理数据的代码在同一个地方,那么计算速度往往就很快。但是,如果代码和数据不在同一个地方,那么其中一个必须移动到另外一个所在的地方。通常情况下,移动代码比移动数据要快得多,因为代码的大小要比数据小的多。Spark就是根据这种原则来进行调度的。

数据所在的位置就是指数据与处理数据的代码之间的距离。根据数据当前的位置,有几个级别的距离,按顺序从最近到最远:

  • PROCESS_LOCAL 数据和运行代码位于同一个JVM中。这是最好的情况。
  • NODE_LOCAL 数据和运行代码位于同一个节点。这会比PROCESS_LOCAL 慢一点,因为数据要在进程之间传输。
  • NO_PREF 从任何地方访问数据都是一样快的。
  • RACK_LOCAL 数据位于同一个服务器机架上。数据位于同一机架的不同服务器上,因此需要通过网络传输数据,通常是经过一个交换机。
  • ANY 数据位于其他机架上。

Spark会优先调度task在最佳的位置级别,但这并不总是可能的。在任何空闲executor上都没有未处理的数据的情况下,Spark会切换到更低的位置级别。有两种选择:a) 等待CPU空闲下来,在同一服务器上启动一个task,或b) 立即在远端启动一个task,并要求将数据移动到那里。 Spark通常的策略就是,先等待一段时间,希望繁忙的CPU能得到释放,一旦超过指定时间,就开始将数据从远端移动到空闲的CPU。每个位置级别之间的超时时间都可以单独配置,也可以全部配置在一个参数中。关于spark.locality参数的详细信息,请查看configuration page。如果您的tasks运行时间很长并且位置级别很差,那么可以增加配置的值,但是默认的设置通常就能满足多数的情况。

4、总结 - Summary

这篇简短的调优指南指出了在调优Spark应用程序时,应该关注的主要的点——最重要的是数据序列化和内存调优。对于大多数应用程序,切换到Kryo序列化,并以序列化的形式持久化数据就能解决大多数常见的性能问题。

参考

Tuning Spark

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据学习指南 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本篇文章翻译之 Tuning Spark。
    • 2.1 内存管理 - Memory Management Overview
      • 2.2 确定内存占用 - Determining Memory Consumption
        • 2.3 数据结构调优 - Tuning Data Structures
          • 2.4 序列化RDD存储 - Serialized RDD Storage
            • 2.5 垃圾收集调优 - Garbage Cllection Tuning
              • 参考
              相关产品与服务
              文件存储
              文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档