前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Pyspark学习笔记(四)弹性分布式数据集 RDD 综述(上)

Pyspark学习笔记(四)弹性分布式数据集 RDD 综述(上)

作者头像
TeeyoHuang
发布2022-11-28 19:11:30
3.7K0
发布2022-11-28 19:11:30
举报

文章目录


前言 本篇主要是对RDD做一个大致的介绍,建立起一个基本的概念

参考文献:pyspark-rdd

1、什么是 RDD - Resilient Distributed Dataset?

RDD(弹性分布式数据集) 是 PySpark 的基本构建块,是spark编程中最基本的数据对象;

    它是spark应用中的数据集,包括最初加载的数据集,中间计算的数据集,最终结果的数据集,都是RDD。

    从本质上来讲,RDD是对象分布在各个节点上的集合,用来表示spark程序中的数据。以Pyspark为例,其中的RDD就是由分布在各个节点上的python对象组成,类似于python本身的列表的对象的集合。区别在于,python集合仅在一个进程中存在和处理,而RDD分布在各个节点,指的是【分散在多个物理服务器上的多个进程上计算的】

    这里多提一句,尽管可以将RDD保存到硬盘上,但RDD主要还是存储在内存中,至少是预期存储在内存中的,因为spark就是为了支持机器学习应运而生。

一旦你创建了一个 RDD,就不能改变它。

2、PySpark RDD 的基本特性和优势

    RDD,Resilient Distributed Dataset, 弹性、分布式、数据集,可以从他的名字拆解分析一下:

  • 弹性: 有弹性,指的是可容错的,即如果一个执行任务的节点丢失了,数据集依然可以被构建出来。这是因为每个RDD都有其谱系(DAG),能够从头构建RDD。
  • 分布式:RDD是分布式的,RDD的数据至少被分到一个分区中,在集群上跨工作节点分布式地作为对象集合保存在内存中;
  • 数据集: RDD是由记录组成的数据集。所谓记录类似于表中的一“行”数据,一般由几个字段构成。记录,是数据集中唯一可以区分数据的集合,RDD 的各个分区包含不同的一部分记录,可以独立进行操作。

RDD的优势有如下:

  • 内存处理 PySpark 从磁盘加载数据并 在内存中处理数据 并将数据保存在内存中,这是 PySpark 和 Mapreduce(I/O 密集型)之间的主要区别。在转换操作过程中,我们还可以在内存中缓存/持久化 RDD 以重用之前的计算。
  • 不变性 PySpark 在 HDFS、S3 等上的容错数据存储上运行,因此任何 RDD 操作失败,它会自动从其他分区重新加载数据。此外,当 PySpark 应用程序在集群上运行时,PySpark 任务失败会自动恢复一定次数(根据配置)并无缝完成应用程序。
  • 惰性运算 PySpark 不会在驱动程序出现/遇到 RDD 转换时对其进行评估,而是在遇到(DAG)时保留所有转换,并在看到第一个 RDD 操作时评估所有转换。

3、PySpark RDD 局限

PySpark RDD 不太适合更新状态存储的应用程序,例如 Web 应用程序的存储系统。对于这些应用程序,使用执行传统更新日志记录和数据检查点的系统(例如数据库)更有效。

RDD 的目标是为批处理分析提供高效的编程模型,并离开这些异步应用程序。

4、创建 RDD

RDD 主要以两种不同的方式创建:

  • 并行化现有的集合;
  • 引用在外部存储系统中的数据集(HDFS,S3等等)

在使用pyspark时,一般都会在最开始最开始调用如下入口程序:

代码语言:javascript
复制
from pyspark.sql import SparkSession
# 创建一个spark对象
spark = SparkSession \
		.builder \
		.appName("test") \
		.getOrCreate()
sc = spark.sparkContext

①使用 sparkContext.parallelize() 创建 RDD

此函数将驱动程序中的现有集合加载到并行化 RDD 中。这是创建 RDD 的基本方法,当内存中已有从文件或数据库加载的数据时使用。并且它要求在创建 RDD 之前所有数据都存在于驱动程序中。

代码语言:javascript
复制
#Create RDD from parallelize    
data = [1,2,3,4,5,6,7,8,9,10,11,12]
Rdd = spark.sparkContext.parallelize(data)

②引用在外部存储系统中的数据集

Spark 将文本文件读入 RDD — 参考文献

sparkContext.textFile()

用于从 HDFS、S3 和任何 Hadoop 支持的文件系统读取文本文件,此方法将路径作为参数,并可选择将多个分区作为第二个参数;

sparkContext.wholeTextFiles()

将文本文件读入 RDD(String,String) 类型的 PairedRDD,键是文件路径,值是文件内容。此方法还将路径作为参数,并可选择将多个分区作为第二个参数。

当我们知道要读取的多个文件的名称时,如果想从文件夹中读取所有文件以创建 RDD,只需输入带逗号分隔符的所有文件名和一个文件夹,并且上述两种方法都支持这一点。同时也接受模式匹配和通配符。

③创建空RDD

代码语言:javascript
复制
rdd = spark.sparkContext.emptyRDD
rdd2 = spark.sparkContext.parallelize( [ ],10)  #This creates 10 partitions

5、RDD并行化

参考文献

启动 RDD 时,它会根据资源的可用性自动将数据拆分为分区。

getNumPartitions() - 这是一个 RDD 函数,它返回我们的数据集分成的多个分区。

我们也可以手动设置多个分区,我们只需要将多个分区作为第二个参数传递给这些函数,

例如

代码语言:javascript
复制
sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 10)

有时我们可能需要对RDD进行**重新分区**, PySpark 提供了两种重新分区的方式;

第一:使用repartition(numPartitions)从所有节点混洗数据的方法,也称为完全混洗,

repartition()方法是一项非常昂贵的操作,因为它会从集群中的所有节点打乱数据。

第二:使用coalesce(n)方法**从最小节点混洗数据,仅用于减少分区数**。

这是repartition()使用合并降低跨分区数据移动的优化或改进版本。

例如,如果现在有 4 个分区,那么coalesce(2)只从 2 个节点移动数据。

代码语言:javascript
复制
#执行前:
Partition 1 : 0 1 2
Partition 2 : 3 4 5
Partition 3 : 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15
Partition 6 : 16 17 18 19
#执行后:
Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12 
Partition 5 : 13 14 15 16 17 18 19

第三:使用partitionBy(numPartitions, partiontionFunc=portable_hash)函数,

6、PySpark RDD 操作

详细介绍可以参考我的博文:

Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

Pyspark学习笔记(五)RDD操作(二)_RDD行动操作

转化操作(Transformations ):操作RDD并返回一个 新RDD 的函数;

行动操作(Actions ) :操作RDD, 触发计算, 并返回 一个值 或者 进行输出 的函数。

二者最大的区别是,转化操作是惰性的 , 将一个 RDD 转换/更新为另一个,意味着直到我们调用一个 行动操作之前,是不会执行计算的。

更多细节和例子,请查看后续博文

7、RDD的类型

除了包含通用属性和函数的基本类型BaseRDD外,RDD还有以下常见的类型:

PairRDD: 由键值对组成的RDD,比如前面提到的用wholeTextFiles()方法读取的内容就是以键值对的形式存在

DoubleRDD: 由双精度浮点数组成的RDD。

DataFrame:以前的版本被称为SchemaRDD,按一组有固定名字和类型的列来组织的分布式数据集. DataFrame等价于sparkSQL中的关系型表 所以我们在使用sparkSQL的时候常常要创建这个DataFrame。

HadoopRDD:提供读取存储在HDFS上的数据的RDD。

8、混洗操作

Shuffle 是 PySpark 用来在不同执行器甚至跨机器重新分配数据的机制

可能导致shuffle的操作包括:

repartitioncoalesce重新分区操作,

groupByKeyreduceByKey聚合操作(计数除外),

以及cogroupjoin连接操作

PySpark Shuffle 是一项昂贵的操作,因为它涉及以下内容

·磁盘输入/输出

·涉及数据序列化和反序列化

·网络输入/输出

混洗分区大小和性能

根据数据集大小,较多的内核和内存混洗可能有益或有害我们的任务。

①当处理较少的数据量时,通常应该减少 shuffle 分区,

否则最终会得到许多分区文件,每个分区中的记录数较少,形成了文件碎片化。

②另一方面,当有太多数据且分区数量较少时,会导致运行时间较长的任务较少,有时也可能会出现内存不足错误。

获得正确大小的 shuffle 分区总是很棘手,需要多次运行不同的值才能达到优化的数量。当在 PySpark task上遇到性能问题时,这是要寻找的关键属性之一

系列文章目录:

Pyspark学习笔记(一)—序言及目录

①.Pyspark学习笔记(二)— spark部署及spark-submit命令简介

②.Pyspark学习笔记(三)— SparkContext 与 SparkSession

③.Pyspark学习笔记(四)弹性分布式数据集 RDD 综述(上)

Pyspark学习笔记(四)弹性分布式数据集 RDD 综述(下)

Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

Pyspark学习笔记(五)RDD操作(二)_RDD行动操作

⑦Pyspark学习笔记(五)RDD操作(三)_键值对RDD转换操作

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-08-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 前言 本篇主要是对RDD做一个大致的介绍,建立起一个基本的概念
  • 1、什么是 RDD - Resilient Distributed Dataset?
  • 2、PySpark RDD 的基本特性和优势
  • 3、PySpark RDD 局限
  • 4、创建 RDD
    • ①使用 sparkContext.parallelize() 创建 RDD
      • ②引用在外部存储系统中的数据集
        • ③创建空RDD
        • 5、RDD并行化
        • 6、PySpark RDD 操作
        • 7、RDD的类型
        • 8、混洗操作
        • 系列文章目录:
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档