专栏首页SAMshare用PySpark开发时的调优思路(上)

用PySpark开发时的调优思路(上)

这一小节的内容算是对pyspark入门的一个ending了,全文主要是参考学习了美团Spark性能优化指南的基础篇和高级篇内容,主体脉络和这两篇文章是一样的,只不过是基于自己学习后的理解进行了一次总结复盘,而原文中主要是用Java来举例的,我这边主要用pyspark来举例。文章主要会从4个方面(或者说4个思路)来优化我们的Spark任务,主要就是下面的图片所示:(本小节只写了开发习惯调优哈)

1. 开发习惯调优

1)尽可能复用同一个RDD,避免重复创建,并且适当持久化数据

这种开发习惯是需要我们对于即将要开发的应用逻辑有比较深刻的思考,并且可以通过code review来发现的,讲白了就是要记得我们创建过啥数据集,可以复用的尽量广播(broadcast)下,能很好提升性能。

# 最低级写法,相同数据集重复创建。
rdd1 = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
rdd2 = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
print(rdd1.take(10))
print(rdd2.map(lambda x:x[0:1]).take(10))

# 稍微进阶一些,复用相同数据集,但因中间结果没有缓存,数据会重复计算
rdd1 = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量
print(rdd1.take(10))
print(rdd1.map(lambda x:x[0:1]).take(10))

# 相对比较高效,使用缓存来持久化数据
rdd = sc.parallelize(range(1, 11), 4).cache()  # 或者persist()
rdd_map = rdd.map(lambda x: x*2)
rdd_reduce = rdd.reduce(lambda x, y: x+y)
print(rdd_map.take(10))
print(rdd_reduce)

下面我们就来对比一下使用缓存能给我们的Spark程序带来多大的效率提升吧,我们先构造一个程序运行时长测量器。

import time
# 统计程序运行时间
def time_me(info="used"):
    def _time_me(fn):
        @functools.wraps(fn)
        def _wrapper(*args, **kwargs):
            start = time.time()
            fn(*args, **kwargs)
            print("%s %s %s" % (fn.__name__, info, time.time() - start), "second")
        return _wrapper
    return _time_me

下面我们运行下面的代码,看下使用了cache带来的效率提升:

@time_me()
def test(types=0):
    if types == 1:
        print("使用持久化缓存")
        rdd = sc.parallelize(range(1, 10000000), 4)
        rdd1 = rdd.map(lambda x: x*x + 2*x + 1).cache()  # 或者 persist(StorageLevel.MEMORY_AND_DISK_SER)
        print(rdd1.take(10))
        rdd2 = rdd1.reduce(lambda x, y: x+y)
        rdd3 = rdd1.reduce(lambda x, y: x + y)
        rdd4 = rdd1.reduce(lambda x, y: x + y)
        rdd5 = rdd1.reduce(lambda x, y: x + y)
        print(rdd5)
    else:
        print("不使用持久化缓存")
        rdd = sc.parallelize(range(1, 10000000), 4)
        rdd1 = rdd.map(lambda x: x * x + 2 * x + 1)
        print(rdd1.take(10))
        rdd2 = rdd1.reduce(lambda x, y: x + y)
        rdd3 = rdd1.reduce(lambda x, y: x + y)
        rdd4 = rdd1.reduce(lambda x, y: x + y)
        rdd5 = rdd1.reduce(lambda x, y: x + y)
        print(rdd5)

        
test()   # 不使用持久化缓存
time.sleep(10)
test(1)  # 使用持久化缓存
# output:
# 使用持久化缓存
# [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
# 333333383333334999999
# test used 26.36529278755188 second
# 使用持久化缓存
# [4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
# 333333383333334999999
# test used 17.49532413482666 second

同时我们打开YARN日志来看看:http://localhost:4040/jobs/

因为我们的代码是需要重复调用RDD1的,当没有对RDD1进行持久化的时候,每次当它被action算子消费了之后,就释放了,等下一个算子计算的时候要用,就从头开始计算一下RDD1。代码中需要重复调用RDD1 五次,所以没有缓存的话,差不多每次都要6秒,总共需要耗时26秒左右,但是,做了缓存,每次就只需要3s不到,总共需要耗时17秒左右。

另外,这里需要提及一下一个知识点,那就是持久化的级别,一般cache的话就是放入内存中,就没有什么好说的,需要讲一下的就是另外一个 persist(),它的持久化级别是可以被我们所配置的:

持久化级别

含义解释

MEMORY_ONLY

将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。使用cache()方法时,实际就是使用的这种持久化策略,性能也是最高的。

MEMORY_AND_DISK

优先尝试将数据保存在内存中,如果内存不够存放所有的数据,会将数据写入磁盘文件中。

MEMORY_ONLY_SER

基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。

MEMORY_AND_DISK_SER

基本含义同MEMORY_AND_DISK。唯一的区别是会先序列化,节约内存。

DISK_ONLY

使用未序列化的Java对象格式,将数据全部写入磁盘文件中。一般不推荐使用。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.

对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。一般也不推荐使用。

2)尽量避免使用低性能算子

shuffle类算子算是低性能算子的一种代表,所谓的shuffle类算子,指的是会产生shuffle过程的操作,就是需要把各个节点上的相同key写入到本地磁盘文件中,然后其他的节点通过网络传输拉取自己需要的key,把相同key拉到同一个节点上进行聚合计算,这种操作必然就是有大量的数据网络传输与磁盘读写操作,性能往往不是很好的。

那么,Spark中有哪些算子会产生shuffle过程呢?

操作类别

shuffle类算子

备注

分区操作

repartition()、repartitionAndSortWithinPartitions()、coalesce(shuffle=true)

重分区操作一般都会shuffle,因为需要对所有的分区数据进行打乱。

聚合操作

reduceByKey、groupByKey、sortByKey

需要对相同key进行操作,所以需要拉到同一个节点上。

关联操作

join类操作

需要把相同key的数据shuffle到同一个节点然后进行笛卡尔积

去重操作

distinct等

需要对相同key进行操作,所以需要shuffle到同一个节点上。

排序操作

sortByKey等

需要对相同key进行操作,所以需要shuffle到同一个节点上。

这里进一步介绍一个替代join的方案,因为join其实在业务中还是蛮常见的。

# 原则2:尽量避免使用低性能算子
rdd1 = sc.parallelize([('A1', 211), ('A1', 212), ('A2', 22), ('A4', 24), ('A5', 25)])
rdd2 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
# 低效的写法,也是传统的写法,直接join
rdd_join = rdd1.join(rdd2)
print(rdd_join.collect())
# [('A4', (24, 14)), ('A2', (22, 12)), ('A1', (211, 11)), ('A1', (212, 11))]
rdd_left_join = rdd1.leftOuterJoin(rdd2)
print(rdd_left_join.collect())
# [('A4', (24, 14)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]
rdd_full_join = rdd1.fullOuterJoin(rdd2)
print(rdd_full_join.collect())
# [('A4', (24, 14)), ('A3', (None, 13)), ('A2', (22, 12)), ('A5', (25, None)), ('A1', (211, 11)), ('A1', (212, 11))]

# 高效的写法,使用广播+map来实现相同效果
# tips1: 这里需要注意的是,用来broadcast的RDD不可以太大,最好不要超过1G
# tips2: 这里需要注意的是,用来broadcast的RDD不可以有重复的key的
rdd1 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
rdd2 = sc.parallelize([('A1', 211), ('A1', 212), ('A2', 22), ('A4', 24), ('A5', 25)])

# step1: 先将小表进行广播,也就是collect到driver端,然后广播到每个Executor中去。
rdd_small_bc = sc.broadcast(rdd1.collect())

# step2:从Executor中获取存入字典便于后续map操作
rdd_small_dict = dict(rdd_small_bc.value)

# step3:定义join方法
def broadcast_join(line, rdd_small_dict, join_type):
    k = line[0]
    v = line[1]
    small_table_v = rdd_small_dict[k] if k in rdd_small_dict else None
    if join_type == 'join':
        return (k, (v, small_table_v)) if k in rdd_small_dict else None
    elif join_type == 'left_join':
        return (k, (v, small_table_v if small_table_v is not None else None))
    else:
        print("not support join type!")

# step4:使用 map 实现 两个表join的功能
rdd_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "join")).filter(lambda line: line is not None)
rdd_left_join = rdd2.map(lambda line: broadcast_join(line, rdd_small_dict, "left_join")).filter(lambda line: line is not None)
print(rdd_join.collect())
print(rdd_left_join.collect())
# [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14))]
# [('A1', (211, 11)), ('A1', (212, 11)), ('A2', (22, 12)), ('A4', (24, 14)), ('A5', (25, None))]

上面的RDD join被改写为 broadcast+map的PySpark版本实现,不过里面有两个点需要注意:

  • tips1: 用来broadcast的RDD不可以太大,最好不要超过1G
  • tips2: 用来broadcast的RDD不可以有重复的key
3)尽量使用高性能算子

上一节讲到了低效算法,自然地就会有一些高效的算子。

原算子

高效算子(替换算子)

说明

map

mapPartitions

直接map的话,每次只会处理一条数据,而mapPartitions则是每次处理一个分区的数据,在某些场景下相对比较高效。(分区数据量不大的情况下使用,如果有数据倾斜的话容易发生OOM)

groupByKey

reduceByKey/aggregateByKey

这类算子会在原节点先map-side预聚合,相对高效些。

foreach

foreachPartitions

同第一条记录一样。

filter

filter+coalesce

当我们对数据进行filter之后,有很多partition的数据会剧减,然后直接进行下一步操作的话,可能就partition数量很多但处理的数据又很少,task数量没有减少,反而整体速度很慢;但如果执行了coalesce算子,就会减少一些partition数量,把数据都相对压缩到一起,用更少的task处理完全部数据,一定场景下还是可以达到整体性能的提升。

repartition+sort

repartitionAndSortWithinPartitions

直接用就是了。

4)广播大变量

如果我们有一个数据集很大,并且在后续的算子执行中会被反复调用,那么就建议直接把它广播(broadcast)一下。当变量被广播后,会保证每个executor的内存中只会保留一份副本,同个executor内的task都可以共享这个副本数据。如果没有广播,常规过程就是把大变量进行网络传输到每一个相关task中去,这样子做,一来频繁的网络数据传输,效率极其低下;二来executor下的task不断存储同一份大数据,很有可能就造成了内存溢出或者频繁GC,效率也是极其低下的。

# 原则4:广播大变量
rdd1 = sc.parallelize([('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)])
rdd1_broadcast = sc.broadcast(rdd1.collect())
print(rdd1.collect())
print(rdd1_broadcast.value)
# [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]
# [('A1', 11), ('A2', 12), ('A3', 13), ('A4', 14)]

?学习资源推荐:

1)《Spark性能优化指南——基础篇》

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

2)《Spark性能优化指南——高级篇》

https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

本文分享自微信公众号 - SAMshare(gh_8528ce7b7e80),作者:SamShare

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-06-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 用PySpark开发时的调优思路(下)

    下面我们就来讲解一些常用的Spark资源配置的参数吧,了解其参数原理便于我们依据实际的数据情况进行配置。

    Sam Gor
  • PySpark入门级学习教程,框架思维(上)

    为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python、Scala还是Java,都会...

    Sam Gor
  • [源码解析] 深度学习分布式训练框架 horovod (8) --- on spark

    Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。

    罗西的思考
  • Eat pyspark 1st day | 快速搭建你的Spark开发环境

    下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-213...

    超哥的杂货铺
  • Pyspark学习笔记(四)弹性分布式数据集 RDD(上)

    RDD(弹性分布式数据集) 是 PySpark 的基本构建块,它是容错、不可变的 分布式对象集合。

    TeeyoHuang
  • 【原】Spark之机器学习(Python版)(二)——分类

      写这个系列是因为最近公司在搞技术分享,学习Spark,我的任务是讲PySpark的应用,因为我主要用Python,结合Spark,就讲PySpark了。然而...

    Charlotte77
  • Jupyter在美团民宿的应用实践

    做算法的同学对于Kaggle应该都不陌生,除了举办算法挑战赛以外,它还提供了一个学习、练习数据分析和算法开发的平台。Kaggle提供了Kaggle Kernel...

    美团技术团队
  • 金色传说,开源教程!属于算法的大数据工具-pyspark

    spark是目前大数据领域的核心技术栈,许多从事数据相关工作的小伙伴都想驯服它,变成"驯龙高手",以便能够驾驭成百上千台机器组成的集群之龙来驰骋于大数据之海。

    Sam Gor
  • PySpark——开启大数据分析师之路

    近日由于工作需要,突击学了一下PySpark的简单应用。现分享其安装搭建过程和简单功能介绍。

    luanhz
  • PySpark简介

    Apache Spark是一个大数据处理引擎,与MapReduce相比具有多个优势。通过删除Hadoop中的大部分样板代码,Spark提供了更大的简单性。此外,...

    双愚
  • Spark vs Dask Python生态下的计算引擎

    对于 Python 环境下开发的数据科学团队,Dask 为分布式分析指出了非常明确的道路,但是事实上大家都选择了 Spark 来达成相同的目的。Dask 是一个...

    Ewdager
  • PySpark SQL 相关知识介绍

    大数据是这个时代最热门的话题之一。但是什么是大数据呢?它描述了一个庞大的数据集,并且正在以惊人的速度增长。大数据除了体积(Volume)和速度(velocity...

    foochane
  • 3万字长文,PySpark入门级学习教程,框架思维

    关于PySpark,我们知道它是Python调用Spark的接口,我们可以通过调用Python API的方式来编写Spark程序,它支持了大多数的Spark功能...

    Sam Gor
  • 想学习Spark?先带你了解一些基础的知识

    之前也学习过一阵子的Spark了,是时候先输出一些知识内容了,一来加深印象,二来也可以分享知识,一举多得,今天这篇主要是在学习实验楼的一门课程中自己记下来的笔记...

    Sam Gor
  • 使用Python写spark 示例

    个人GitHub地址: https://github.com/LinMingQiang

    py3study
  • PySpark源码解析,教你用Python调用高效Scala接口,搞定大规模数据分析

    众所周知,Spark 框架主要是由 Scala 语言实现,同时也包含少量 Java 代码。Spark 面向用户的编程接口,也是 Scala。然而,在数据科学领域...

    机器之心
  • 在HTML5上开发音视频应用的五种思路

    无论是实时视频监控还是直播点播等应用场景,最起码的一个操作就是播放视频。其中最基本的思路就是利用OS的API在PC开发桌面应用、在移动端开发Native App...

    音视频开发进阶
  • 总要到最后关头才肯重构代码,强如spark也不例外

    用过Python做过机器学习的同学对Python当中pandas当中的DataFrame应该不陌生,如果没做过也没有关系,我们简单来介绍一下。DataFrame...

    TechFlow-承志
  • 一起揭开 PySpark 编程的神秘面纱

    Spark 是 UC Berkeley AMP lab 开发的一个集群计算的框架,类似于 Hadoop,但有很多的区别。最大的优化是让计算任务的中间结果可以存储...

    Sam Gor

扫码关注云+社区

领取腾讯云代金券