Spark程序可以快如闪电⚡️,也可以慢如蜗牛?。
它的性能取决于用户使用它的方式。
一般来说,如果有可能,用户应当尽可能多地使用SparkSQL以取得更好的性能。
主要原因是SparkSQL是一种声明式编程风格,背后的计算引擎会自动做大量的性能优化工作。
基于RDD的Spark的性能调优属于坑非常深的领域,并且很容易踩到。
我们将介绍Spark调优原理,Spark任务监控,以及Spark调优案例。
本文参考了以下文章:
《Spark性能优化指南——基础篇》:https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
《Spark性能优化指南——高级篇》:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
《spark-调节executor堆外内存》:https://www.cnblogs.com/colorchild/p/12175328.html
import findspark
#指定spark_home为刚才的解压路径,指定python路径
spark_home = "/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
python_path = "/Users/liangyun/anaconda3/bin/python"
findspark.init(spark_home,python_path)
import pyspark
from pyspark.sql import SparkSession
#SparkSQL的许多功能封装在SparkSession的方法接口中
spark = SparkSession.builder \
.appName("test") \
.config("master","local[4]") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext可以用下面三个公式来近似估计spark任务的执行时间。
可以用下面二个公式来说明spark在executor上的内存分配。
如果程序执行太慢,调优的顺序一般如下:
1,首先调整任务并行度,并调整partition分区。
2,尝试定位可能的重复计算,并优化之。
3,尝试定位数据倾斜问题或者计算倾斜问题并优化之。
4,如果shuffle过程提示堆外内存不足,考虑调高堆外内存。
5,如果发生OOM或者GC耗时过长,考虑提高executor-memory或降低executor-core。
以下是对上述公式中涉及到的一些概念的初步解读。
Spark任务启动后,可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。
该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况,包括任务进度,耗时分析,存储分析,shuffle数据量大小等。
最常查看的页面是 Stages页面和Excutors页面。
Jobs:每一个Action操作对应一个Job,以Job粒度显示Application进度。有时间轴Timeline。

Stages:Job在遇到shuffle切开Stage,显示每个Stage进度,以及shuffle数据量。

可以点击某个Stage进入详情页,查看其下面每个Task的执行情况以及各个partition执行的费时统计。

Storage:
监控cache或者persist导致的数据存储大小。
Environment: 显示spark和scala版本,依赖的各种jar包及其版本。
Excutors : 监控各个Excutors的存储和shuffle情况。

SQL: 显示各种SQL命令在那些Jobs中被执行。
下面介绍几个调优的典型案例:
1,资源配置优化
2,利用缓存减少重复计算
3,数据倾斜调优
4,broadcast+map代替join
5,reduceByKey/aggregateByKey代替groupByKey
1,资源配置优化
下面是一个资源配置的例子:
优化前:
#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 8 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files data.csv,profile.txt
--py-files pkg.py,tqdm.py
pyspark_demo.py 优化后:
#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.default.parallelism=1600 \
--conf spark.sql.shuffle.partitions=1600 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g\
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files data.csv,profile.txt
--py-files pkg.py,tqdm.py
pyspark_demo.py 这里主要减小了 executor-cores数量,一般设置为1~4,过大的数量可能会造成每个core计算和存储资源不足产生OOM,也会增加GC时间。此外也将默认分区数调到了1600,并设置了2G的堆外内存。
2, 利用缓存减少重复计算
%%time
# 优化前:
import math
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n
print(mean)-1.889935655259299
CPU times: user 40.2 ms, sys: 12.4 ms, total: 52.6 ms
Wall time: 2.76 s%%time
# 优化后:
import math
from pyspark.storagelevel import StorageLevel
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)
s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n
rdd_data.unpersist()
print(mean)-1.889935655259299
CPU times: user 40.5 ms, sys: 11.5 ms, total: 52 ms
Wall time: 2.18 s3, 数据倾斜调优
%%time
# 优化前:
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect()) [('good', 10000.0), ('hello', 1000000.0), ('spark', 10000.0), ('world', 1000000.0), ('love', 10000.0), ('morning', 10000.0), ('I', 10000.0)]
CPU times: user 285 ms, sys: 27.6 ms, total: 313 ms
Wall time: 2.74 s%%time
# 优化后:
import random
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)
rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect())
#作者按:此处仅示范原理,单机上该优化方案难以获得性能优势[('good', 10000.0), ('hello', 1000000.0), ('spark', 10000.0), ('world', 1000000.0), ('love', 10000.0), ('morning', 10000.0), ('I', 10000.0)]
CPU times: user 351 ms, sys: 51 ms, total: 402 ms
Wall time: 7 s4, broadcast+map代替join
该优化策略一般限于有一个参与join的rdd的数据量不大的情况。
%%time
# 优化前:
rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))
print(rdd_students.collect())[('LiLy', 20, 'female'), ('LiLei', 18, 'male'), ('HanMeimei', 19, 'female'), ('Jim', 17, 'male')]
CPU times: user 43.9 ms, sys: 11.6 ms, total: 55.6 ms
Wall time: 307 ms%%time
# 优化后:
rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
ages = rdd_age.collect()
broads = sc.broadcast(ages)
def get_age(it):
result = []
ages = dict(broads.value)
for x in it:
name = x[0]
age = ages.get(name,0)
result.append((x[0],age,x[1]))
return iter(result)
rdd_students = rdd_gender.mapPartitions(get_age)
print(rdd_students.collect())[('LiLei', 18, 'male'), ('HanMeimei', 19, 'female'), ('Jim', 17, 'male'), ('LiLy', 20, 'female')]
CPU times: user 14.3 ms, sys: 7.43 ms, total: 21.7 ms
Wall time: 86.3 ms5,reduceByKey/aggregateByKey代替groupByKey
groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的合并操作,大大减少了shuffle的数据量。
%%time
# 优化前:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
names = rdd_names.collect()
print(names)[('class1', ['LiLei', 'Lucy', 'Ann', 'Jim']), ('class2', ['HanMeimei', 'Lily'])]
CPU times: user 25.3 ms, sys: 7.32 ms, total: 32.6 ms
Wall time: 164 ms%%time
# 优化后:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)
names = rdd_names.collect()
print(names)[('class1', ['LiLei', 'Lucy', 'Ann', 'Jim']), ('class2', ['HanMeimei', 'Lily'])]
CPU times: user 21.6 ms, sys: 6.63 ms, total: 28.3 ms
Wall time: 118 ms