下面这些关于 Spark 的性能调优项,有的是来自官方的,有的是来自别的的工程师,有的则是我自己总结的。
基本概念和原则
首先,要搞清楚 Spark 的几个基本概念和原则,否则系统的性能调优无从谈起:
下面给这样一个直观的例子,当前总的 cpu 利用率并不高:
但是经过根据上述原则的的调整之后,可以显著发现 cpu 总利用率增加了:
其次,涉及性能调优我们经常要改配置,在 Spark 里面有三种常见的配置方式,虽然有些参数的配置是可以互相替代,但是作为最佳实践,还是需要遵循不同的情形下使用不同的配置:
举一个配置的具体例子。slave、worker 和 executor 之间的比例调整。我们经常需要调整并行的 executor 的数量,那么简单说有两种方式:
对上述第 2 点补充说明一下,关于为 executor 分配内存的问题。在默认 YARN 配置下,244G 的 r3.8xlarge 机器,如果配置 spark.executor.memory 为 50(期望 50*4=200),让 4 个 executor 并行,但是实际测试却只有 3 个 executor 并行,直到这个数值配置小到 35 的时候,才观察到 4 个 executor 并行。原因在于 spark.yarn.executor.memoryOverhead 这个参数,会预留一些 overhead 的内存给每一个 executor,默认值是 10%,这点在计算内存分配的时候需要注意。
有的配置在不同的 MR 框架/工具下是不一样的,比如 YARN 下有的参数的默认取值就不同,这点需要注意。
明确这些基础的事情以后,再来一项一项看性能调优的要点。
内存
Memory Tuning,Java 对象会占用原始数据 2~5 倍甚至更多的空间。最好的检测对象内存消耗的办法就是创建 RDD,然后放到 cache 里面去,然后在 UI 上面看 storage 的变化;当然也可以使用 SizeEstimator 来估算。使用-XX:+UseCompressedOops 选项可以压缩指针(8 字节变成 4 字节)。在调用 collect 等等 API 的时候也要小心——大块数据往内存拷贝的时候心里要清楚。内存要留一些给操作系统,比如 20%,这里面也包括了 OS 的 buffer cache,如果预留得太少了,会见到这样的错误:
Required executor memory (235520+23552 MB) is above the max threshold (241664 MB) of this cluster! Please increase the value of ‘yarn.scheduler.maximum-allocation-mb’.
或者干脆就没有这样的错误,但是依然有因为内存不足导致的问题,有的会有警告,比如这个:
16/01/13 23:54:48 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
有的时候连这样的日志都见不到,而是见到一些不清楚原因的 executor 丢失信息:
Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 17.0 failed 4 times, most recent failure: Lost task 12.3 in stage 17.0 (TID 1257, ip-10-184-192-56.ec2.internal): ExecutorLostFailure (executor 79 lost)
Reduce Task 的内存使用。在某些情况下 reduce task 特别消耗内存,比如当 shuffle 出现的时候,比如 sortByKey、groupByKey、reduceByKey 和 join 等,要在内存里面建立一个巨大的 hash table。其中一个解决办法是增大 level of parallelism,这样每个 task 的输入规模就相应减小。另外,注意 shuffle 的内存上限设置,有时候有足够的内存,但是 shuffle 内存不够的话,性能也是上不去的。我们在有大量数据 join 等操作的时候,shuffle 的内存上限经常配置到 executor 的 50%。
注意原始 input 的大小,有很多操作始终都是需要某类全集数据在内存里面完成的,那么并非拼命增加 parallelism 和 partition 的值就可以把内存占用减得非常小的。我们遇到过某些性能低下甚至 OOM 的问题,是改变这两个参数所难以缓解的。但是可以通过增加每台机器的内存,或者增加机器的数量都可以直接或间接增加内存总量来解决。
在选择 EC2 机器类型的时候,对于不确定的机型性能,一定要藉由测试来明确。比如我们最初以为使用 r3.8 xlarge 和 c3.8 xlarge 选择的问题,运算能力相当,前者比后者贵 50%,但是内存是后者的 4 倍。我们曾经根据这个情形,考虑在 c3.8 集群下,跑单 executor 时的现有的 CPU 利用率,估算了一个如果切换到 r3.8 以后大致的性能。可是当我们真正拿 r3.8 来做测试的时候,却发现这个估算不正确,原来 c3.8 和 r3.8 的性能不一样,不仅仅是内存差别,在 Spark job 内存占用远不到上限的情况下,我们发现 r3.8 xlarge 要比 c3.8 xlarge 性能好 40%。
另外,有一些 RDD 的 API,比如 cache,persist,都会把数据强制放到内存里面,如果并不明确这样做带来的好处,就不要用它们。特别值得一提的是 par 这个方法,通常来说,我们在写 Spark 任务,使用 for 循环的时候要敏感,因为这意味着可能在串行执行一个任务,通过 par 这个方法可以让他们并行化。但是这也不是绝对的,因为并行化意味着可能带来额外的内存占用,而且我也遇到过出错的情况,在已经并行的操作里面再引入并行的操作,也可能引入一些不可预料的问题,需要权衡,分析具体的场景。
CPU
Level of Parallelism。指定它以后,在进行 reduce 类型操作的时候,默认 partition 的数量就被指定了。这个参数在实际工程中通常是必不可少的,一般都要根据 input 和每个 executor 内存的大小来确定。设置 level of parallelism 或者属性 spark.default.parallelism 来改变并行级别,通常来说,每一个 CPU 核可以分配 2~3 个 task。
CPU core 的访问模式是共享还是独占。即 CPU 核是被同一 host 上的 executor 共享还是瓜分并独占。比如,一台机器上共有 32 个 CPU core 的资源,同时部署了两个 executor,总内存是 50G,那么一种方式是配置 spark.executor.cores 为 16,spark.executor.memory 为 20G,这样由于内存的限制,这台机器上会部署两个 executor,每个都使用 20G 内存,并且各使用 “独占” 的 16 个 CPU core 资源;而在内存资源不变的前提下,也可以让这两个 executor“共享” 这 32 个 core。根据我的测试,独占模式的性能要略好与共享模式。
GC 调优。打印 GC 信息:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps。要记得默认 60% 的 executor 内存可以被用来作为 RDD 的缓存,因此只有 40% 的内存可以被用来作为对象创建的空间,这一点可以通过设置 spark.storage.memoryFraction 改变。如果有很多小对象创建,但是这些对象在不完全 GC 的过程中就可以回收,那么增大 Eden 区会有一定帮助。如果有任务从 HDFS 拷贝数据,内存消耗有一个简单的估算公式——比如 HDFS 的 block size 是 64MB,工作区内有 4 个 task 拷贝数据,而解压缩一个 block 要增大 3 倍大小,那么估算内存消耗就是:4*3*64MB。另外,工作中遇到过这样的一个问题:GC 默认情况下有一个限制,默认是 GC 时间不能超过 2% 的 CPU 时间,但是如果大量对象创建(在 Spark 里很容易出现,代码模式就是一个 RDD 转下一个 RDD),就会导致大量的 GC 时间,从而出现 “OutOfMemoryError: GC overhead limit exceeded”,对于这个,可以通过设置-XX:-UseGCOverheadLimit 关掉它。
序列化和传输
Data Serialization,默认使用的是 Java Serialization,这个程序员最熟悉,但是性能、空间表现都比较差。还有一个选项是 Kryo Serialization,更快,压缩率也更高,但是并非支持任意类的序列化。在 Spark UI 上能够看到序列化占用总时间开销的比例,如果这个比例高的话可以考虑优化内存使用和序列化。
Broadcasting Large Variables。在 task 使用静态大对象的时候,可以把它 broadcast 出去。Spark 会打印序列化后的大小,通常来说如果它超过 20KB 就值得这么做。有一种常见情形是,一个大表 join 一个小表,把小表 broadcast 后,大表的数据就不需要在各个 node 之间疯跑,安安静静地呆在本地等小表 broadcast 过来就好了。
Data Locality。数据和代码要放到一起才能处理,通常代码总比数据要小一些,因此把代码送到各处会更快。Data Locality 是数据和处理的代码在屋里空间上接近的程度:PROCESS_LOCAL(同一个 JVM)、NODE_LOCAL(同一个 node,比如数据在 HDFS 上,但是和代码在同一个 node)、NO_PREF、RACK_LOCAL(不在同一个 server,但在同一个机架)、ANY。当然优先级从高到低,但是如果在空闲的 executor 上面没有未处理数据了,那么就有两个选择:
默认当这种情况发生 Spark 会等一会儿(spark.locality),即策略(1),如果繁忙的 CPU 停不下来,就会执行策略(2)。
代码里对大对象的引用。在 task 里面引用大对象的时候要小心,因为它会随着 task 序列化到每个节点上去,引发性能问题。只要序列化的过程不抛出异常,引用对象序列化的问题事实上很少被人重视。如果,这个大对象确实是需要的,那么就不如干脆把它变成 RDD 好了。绝大多数时候,对于大对象的序列化行为,是不知不觉发生的,或者说是预期之外的,比如在我们的项目中有这样一段代码:
rdd.map(r => {
println(BackfillTypeIndex)
})
其实呢,它等价于这样:
rdd.map(r => {
println(this.BackfillTypeIndex)
})
不要小看了这个 this,有时候它的序列化是非常大的开销。
对于这样的问题,一种最直接的解决方法就是:
val dereferencedVariable = this.BackfillTypeIndex
rdd.map(r => println(dereferencedVariable)) // "this" is not serialized
相关地,注解 @transient 用来标识某变量不要被序列化,这对于将大对象从序列化的陷阱中排除掉是很有用的。另外,注意 class 之间的继承层级关系,有时候一个小的 case class 可能来自一棵大树。
文件读写
文件存储和读取的优化。比如对于一些 case 而言,如果只需要某几列,使用 rcfile 和 parquet 这样的格式会大大减少文件读取成本。再有就是存储文件到 S3 上或者 HDFS 上,可以根据情况选择更合适的格式,比如压缩率更高的格式。另外,特别是对于 shuffle 特别多的情况,考虑留下一定量的额外内存给操作系统作为操作系统的 buffer cache,比如总共 50G 的内存,JVM 最多分配到 40G 多一点。
文件分片。比如在 S3 上面就支持文件以分片形式存放,后缀是 partXX。使用 coalesce 方法来设置分成多少片,这个调整成并行级别或者其整数倍可以提高读写性能。但是太高太低都不好,太低了没法充分利用 S3 并行读写的能力,太高了则是小文件太多,预处理、合并、连接建立等等都是时间开销啊,读写还容易超过 throttle。
任务
Spark 的 Speculation。通过设置 spark.speculation 等几个相关选项,可以让 Spark 在发现某些 task 执行特别慢的时候,可以在不等待完成的情况下被重新执行,最后相同的 task 只要有一个执行完了,那么最快执行完的那个结果就会被采纳。
减少 Shuffle。其实 Spark 的计算往往很快,但是大量开销都花在网络和 IO 上面,而 shuffle 就是一个典型。举个例子,如果 (k, v1) join (k, v2) => (k, v3),那么,这种情况其实 Spark 是优化得非常好的,因为需要 join 的都在一个 node 的一个 partition 里面,join 很快完成,结果也是在同一个 node(这一系列操作可以被放在同一个 stage 里面)。但是如果数据结构被设计为 (obj1) join (obj2) => (obj3),而其中的 join 条件为 obj1.column1 == obj2.column1,这个时候往往就被迫 shuffle 了,因为不再有同一个 key 使得数据在同一个 node 上的强保证。在一定要 shuffle 的情况下,尽可能减少 shuffle 前的数据规模,比如这个避免 groupByKey 的例子。下面这个比较的图片来自 Spark Summit 2013 的一个演讲,讲的是同一件事情:
Repartition。运算过程中数据量时大时小,选择合适的 partition 数量关系重大,如果太多 partition 就导致有很多小任务和空任务产生;如果太少则导致运算资源没法充分利用,必要时候可以使用 repartition 来调整,不过它也不是没有代价的,其中一个最主要代价就是 shuffle。再有一个常见问题是数据大小差异太大,这种情况主要是数据的 partition 的 key 其实取值并不均匀造成的(默认使用 HashPartitioner),需要改进这一点,比如重写 hash 算法。测试的时候想知道 partition 的数量可以调用 rdd.partitions().size() 获知。
Task 时间分布。关注 Spark UI,在 Stage 的详情页面上,可以看得到 shuffle 写的总开销,GC 时间,当前方法栈,还有 task 的时间花费。如果你发现 task 的时间花费分布太散,就是说有的花费时间很长,有的很短,这就说明计算分布不均,需要重新审视数据分片、key 的 hash、task 内部的计算逻辑等等,瓶颈出现在耗时长的 task 上面。
重用资源。有的资源申请开销巨大,而且往往相当有限,比如建立连接,可以考虑在 partition 建立的时候就创建好(比如使用 mapPartition 方法),这样对于每个 partition 内的每个元素的操作,就只要重用这个连接就好了,不需要重新建立连接。
可供参考的文档:官方调优文档 Tuning Spark,Spark 配置的官方文档,Spark Programming Guide,Running Spark on YARN,JVMGC 调优文档,JVM 性能调优文档,How-to: Tune Your Apache Spark Jobs part-1 & part-2,Spark on Yarn: Where Have All the Memory Gone?,Spark Architecture。
文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接 《四火的唠叨》