前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Pyspark学习笔记(四)弹性分布式数据集 RDD(下)

Pyspark学习笔记(四)弹性分布式数据集 RDD(下)

作者头像
TeeyoHuang
发布2021-08-18 15:56:09
2.5K0
发布2021-08-18 15:56:09
举报
在这里插入图片描述
在这里插入图片描述

文章目录


前言

本篇主要讲述了如何在执行pyspark任务时候缓存或者共享变量,以达到节约资源、计算量、时间等目的

一、PySpark RDD 持久化

参考文献:https://sparkbyexamples.com/pyspark-rdd#rdd-persistence

    我们在上一篇博客提到,RDD 的转化操作是惰性的,要等到后面执行行动操作的时候,才会真正执行计算;     那么如果我们的流程图中有多个分支,比如某一个转换操作 X 的中间结果,被后续的多个并列的流程图(a,b,c)运用,那么就会出现这么一个情况:     在执行后续的(a,b,c)不同流程的时候,遇到行动操作时,会重新从头计算整个图,即该转换操作X,会被重复调度执行:(X->a), (X->b), (X->c); 如此一来就会浪费时间和计算资源,则RDD的持久化就显得十分有用了。     PySpark 通过使用 cache()persist() 提供了一种优化机制,来存储 RDD 的中间计算,以便它们可以在后续操作中重用。当持久化或缓存一个 RDD 时,每个工作节点将它的分区数据存储在内存或磁盘中,并在该 RDD 的其他操作中重用它们。Spark 在节点上的持久数据是容错的,这意味着如果任何分区丢失,它将使用创建它的原始转换自动重新计算

cache()

    默认将 RDD 计算保存到存储级别MEMORY_ONLY ,这意味着它将数据作为未序列化对象存储在 JVM 堆中 (对于Spark DataFrame 或 Dataset 缓存将其保存到存储级别 ` MEMORY_AND_DISK’)

cachedRdd = rdd.cache()

persist()

有两种函数签名 第一个签名不接受任何参数,默认情况下将其保存到MEMORY_AND_DISK存储级别, 例:

dfPersist = df.persist()

第二个签名StorageLevel作为参数将其存储到不同的存储级别; 例:

dfPersist = df.persist(StorageLevel.MEMORY_ONLY)

该参数可选的有:MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER,DISK_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK_2

unpersist()

PySpark 会自动监视每个persist()cache()调用,并检查每个节点上的使用情况,并在未使用或使用最近最少使用 (LRU) 算法时删除持久数据。 也使用unpersist() 方法手动删除。unpersist() 将 RDD 标记为非持久的,并从内存和磁盘中删除它的所有块:

rddPersist2 = rddPersist.unpersist()

关于 cache() 和 persist()的一些细微区别:链接

二、持久性存储级别

参考文献: ①https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistencehttps://sparkbyexamples.com/spark/spark-persistence-storage-levels/

代码如下(示例):

import org.apache.spark.storage.StorageLevel
rdd2 = rdd.persist(StorageLevel.MEMORY_ONLY_SER) 
df2 = df.persist(StorageLevel.MEMORY_ONLY_SER)

MEMORY_ONLY

这是 RDD cache() 方法的默认行为, 并将 RDD 或 DataFrame 作为反序列化对象存储到 JVM 内存中。当没有足够的可用内存时,它不会保存某些分区的 DataFrame,这些将在需要时重新计算。这需要更多的存储空间,但运行速度更快,因为从内存中读取需要很少的 CPU 周期。

MEMORY_AND_DISK

在此存储级别,RDD 将作为反序列化对象存储在 JVM 内存中。当所需的存储空间大于可用内存时,它会将一些多余的分区存储到磁盘中,并在需要时从磁盘读取数据。由于涉及 I/O,因此速度较慢。

DISK_ONLY

在此存储级别,RDD 仅存储在磁盘上,并且由于涉及 I/O,CPU 计算时间较长。

MEMORY_ONLY_2

与MEMORY_ONLY 存储级别相同, 但将每个分区复制到两个集群节点。

MEMORY_AND_DISK_2

与MEMORY_AND_DISK 存储级别相同, 但将每个分区复制到两个集群节点。

DISK_ONLY_2

与DISK_ONLY 存储级别相同, 但将每个分区复制到两个集群节点。

下面是存储级别的表格表示,通过空间、CPU 和性能的影响选择最适合的一个。 ------------------------------------------------------------------------------------------------------------------------------------ 存储级别           | 占用空间  | CPU 耗时 | 在内存中  | 在硬盘上 | 序列化  |重新计算一些分区 ------------------------------------------------------------------------------------------------------------------------------------ MEMORY_ONLY        High       Low       Y          N         N         Y MEMORY_AND_DISK   High      Medium    some     some       some      N DISK_ONLY            Low       High       N          Y         Y         N ------------------------------------------------------------------------------------------------------------------------------------ 或者参考官方文档的指导: https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose

三、共享变量

    当 PySpark 使用map()reduce()操作执行转换时,它使用任务附带的变量在远程节点上执行转换,并且这些变量不会发送回 PySpark 驱动程序,因此无法在任务之间重用和共享变量。PySpark 共享变量使用以下两种技术解决了这个问题。 ·广播变量(只读共享变量) ·累加器变量(可更新的共享变量)

1.广播变量(只读共享变量)

i 广播变量 ( broadcast variable)

广播变量是只读共享变量,它们被缓存并在集群中的所有节点上可用,以便任务访问或使用。PySpark 不是将这些数据与每个任务一起发送,而是使用高效的广播算法将广播变量分发给机器,以降低通信成本。 PySpark RDD Broadcast 的最佳用例之一是与查找数据一起使用。

ii 创建广播变量

使用SparkContext 类的方法broadcast(v)创建的。 代码如下(示例):

broadcastVar = sc.broadcast([0, 1, 2, 3])
broadcastVar.value

注意,广播变量 不会在调用 sc.broadcast(variable) 时 就发送给执行器,而是在首次使用它时发送给执行器

参考文献:https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/

2.累加器变量(可更新的共享变量)

累加器是另一种类型的共享变量,仅通过关联和交换操作“添加” ,用于执行计数器(类似于 Map-reduce 计数器)或求和操作。 这里不做详细介绍了,可参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-08-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 前言
  • 一、PySpark RDD 持久化
    • ①cache()
      • ②persist()
        • ③ unpersist()
        • 二、持久性存储级别
          • MEMORY_ONLY
            • MEMORY_AND_DISK
              • DISK_ONLY
                • MEMORY_ONLY_2
                  • MEMORY_AND_DISK_2
                    • DISK_ONLY_2
                    • 三、共享变量
                      • 1.广播变量(只读共享变量)
                        • i 广播变量 ( broadcast variable)
                        • ii 创建广播变量
                      • 2.累加器变量(可更新的共享变量)
                      相关产品与服务
                      对象存储
                      对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档