前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

作者头像
TeeyoHuang
发布2022-04-14 07:56:37
1.9K0
发布2022-04-14 07:56:37
举报

Pyspark学习笔记专栏系列文章目录

Pyspark学习笔记(一)—序言及目录

Pyspark学习笔记(二)— spark-submit命令

Pyspark学习笔记(三)— SparkContext 与 SparkSession

Pyspark学习笔记(四)弹性分布式数据集 RDD(上)

Pyspark学习笔记(四)弹性分布式数据集 RDD(下)

Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

文章目录

前言

提示:本篇博客讲的是RDD的操作中的转换操作,即 RDD Transformations

主要参考链接:

1.PySpark RDD Transformations with examples

2.Apache spark python api

一、PySpark RDD 转换操作简介

    PySpark RDD 转换操作(Transformation) 是惰性求值,用于将一个 RDD 转换/更新为另一个。

由于RDD本质上是不可变的,转换操作总是创建一个或多个新的RDD而不更新现有的RDD,因此,一系列RDD转换创建了一个RDD谱系。

1.窄操作

    这些计算数据存在于单个分区上,这意味着分区之间不会有任何数据移动。

常见的执行窄操作的一般有:map()mapPartition()flatMap()filter()union()

2.宽操作

    这些计算数据存在于许多分区上,这意味着分区之间将有数据移动以执行更广泛的转换。由于这些对数据进行混洗,因此它们也称为混洗转换,所以与窄操作相比,是更加昂贵的操作。

常见的执行宽操作的一些方法是:groupBy(), groupByKey(), join(), repartition()

二.常见的转换操作表 & 使用例子

0.创建一个示例rdd, 后续的例子基本以此例展开

data_list = [ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]
# 注意该列表中包含有两层tuple嵌套,相当于列表中的元素是一个 (5,4) 二维的tuple

rdd_test = spark.sparkContext.parallelize(data_list)
print("rdd_test:\n", rdd_test.collect())

则输出为:

[ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]

1.map(<func>)

是所有转换操作中最基本的。

它应用一个具名函数或者匿名函数,对数据集内的所有元素执行同一操作

pyspark.RDD.map

# the example of map
rdd_map_test = rdd_test.map(lambda x: (x[0], x[3]))
print("rdd_map_test\n", rdd_map_test.collect())

相当于只从第一层 tuple 中取出了第0和第3个 子tuple, 输出为:

[((10,1,2,3), (20,2,2,2))]

2.flatMap(<func>)

与map的操作类似,但会进一步拍平数据,表示会去掉一层嵌套.

pyspark.RDD.flatmap

# the example of flatMap
flat_rdd_test = rdd_test.flatMap(lambda x: x)
print("flat_rdd_test\n", flat_rdd_test)

会发现比原始数据少了一层tuple的嵌套,输出为:

[(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]

3.filter(<func>)

一般是依据括号中的一个布尔型表达式,来筛选出满足为真的元素

pyspark.RDD.filter

# the example of filter
key1_rdd = flat_rdd_test.filter(lambda x: x[0]==10)
key2_rdd = flat_rdd_test.filter(lambda x: x[0]==20)
print("filter_1\n",key1_rdd.collect())
print("filter_2\n",key2_rdd.collect())

输出为:

[(10,1,2,3), (10,1,2,4), (10,1,2,4)]
[(20,2,2,2), (20,1,2,3)]

4.union(<rdds>)

类似于sql中的union函数,就是将两个RDD执行合并操作;

pyspark.RDD.union

但是pyspark中的union操作似乎不会自动去重,如果需要去重就使用后面讲的distinct

# the example of union
flat_rdd_test_new = key1_rdd.union(key2_rdd)
print("flat_rdd_test_new\n",flat_rdd_test_new.collect())

刚刚被拆开的两部分又合起来了, 输出为

[(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]

5.distinct(numPartitions=None)

去除RDD中的重复值;带有参数numPartitions,默认值为None,可以对去重后的数据重新分区;

pyspark.RDD.distinct

# the example of distinct
distinct_key1_rdd = key1_rdd.distinct()
print("distinct\n",distinct.collect())

原来的 Key1_rdd 后两个元素是重复出现的,使用distinct之后就会消掉一个:

[(10,1,2,3), (10,1,2,4)]

6.groupBy(<func>)

对元素进行分组,可以是具名函数,也可以是匿名,用来确定对所有元素进行分组的,或者指定用于对元素进行求值以确定其分组方式的表达式.

pyspark.RDD.groupBy

# the example of groupBy
# 我们可以先定义一个具名函数
def return_group_key(x):
    seq = x[1:]
    if sum(seq) > 6:
        return "big"
    else
        return "small"

# 下面这两种写法结果都是一样的
groupby_rdd_1 = flat_rdd_test.groupBy(lambda x: return_group_key(x))
groupby_rdd_1 = flat_rdd_test.groupBy(lambda x: "big" if sum(x[1:])>6 else "small")
print("groupby_1\n", groupby_rdd_1.collect())

直接输出的话,可能输出的是一个寄存器地址:

[('small', <pyspark.resultiterable.ResultIterable object at 0x7f004b4ef850>), ('big', <pyspark.resultiterable.ResultIterable object at 0x7f004ac053d0>)]

这时候我们只需要加一个 mapValues 操作即可,即将后面寄存器地址上的值用列表显示出来

print("groupby_1_明文\n", groupby_rdd_1.mapValues(list).collect())

明文输出为:

[('small', [(10,1,2,3), (20,2,2,2), (20,1,2,3)]), ('big', [(10,1,2,4), (10,1,2,4)])]

下面再感受一下,这个groupBy() 中的是确定分组的【键】,这个意思是什么

groupby_rdd_2 = flat_rdd_test.groupBy(lambda x: x[0]==10)
print("groupby_2_明文\n", groupby_rdd_2.mapValues(list).collect())

这时候就是以匿名函数返回的布尔值作为分组的 key【键】了

[('True', [(10,1,2,3), [(10,1,2,4), (10,1,2,4)), ('False', (20,2,2,2), (20,1,2,3)]])]
groupby_rdd_3 = flat_rdd_test.groupBy(lambda x: x[0])
print("groupby_3_明文\n", groupby_rdd_3.mapValues(list).collect())

这时候就是以匿名函数返回的 x0的具体值 作为分组的 key【键】了

[(10, [(10,1,2,3), [(10,1,2,4), (10,1,2,4)), (20, (20,2,2,2), (20,1,2,3)]])]

最后再回味一下 这个 最关键的是要产生一个key,作为分组的条件,(要么就重新产生,要么就拿现有的值)

7.sortBy(<keyfunc>,ascending=True, numPartitions=None)

将RDD按照参数选出的指定数据集的键进行排序

pyspark.RDD.sortBy

# the example of sortBy
sort_by_ascending_rdd = flat_rdd_test.sortBy(keyfunc=lambda x:x[3])
sort_by_descending_rdd = flat_rdd_test.sortBy(keyfunc=lambda x:x[3], ascending=False)

输出为:

[(20,2,2,2), (10,1,2,3), (20,1,2,3), (10,1,2,4), (10,1,2,4)]
[(10,1,2,4), (10,1,2,4), (10,1,2,3), (20,1,2,3), (20,2,2,2)]

8.repartition( )

重新分区,之前的博客的【并行化】 一节已经描述过

9.coalesce( )

重新分区,之前的博客的【并行化】一节已经描述过:

10.cache( )

缓存,之前博文RDD【持久化】一节已经描述过;

11.persist( )

持久化,之前博文RDD【持久化】一节已经描述过

至此,Pyspark基本的转换操作【Transformation】就介绍完了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pyspark学习笔记专栏系列文章目录
  • Pyspark学习笔记(五)RDD操作(一)_RDD转换操作
    • 文章目录
    • 前言
    • 主要参考链接:
    • 一、PySpark RDD 转换操作简介
      • 1.窄操作
        • 2.宽操作
        • 二.常见的转换操作表 & 使用例子
          • 0.创建一个示例rdd, 后续的例子基本以此例展开
            • 1.map(<func>)
              • 2.flatMap(<func>)
                • 3.filter(<func>)
                  • 4.union(<rdds>)
                    • 5.distinct(numPartitions=None)
                      • 6.groupBy(<func>)
                        • 7.sortBy(<keyfunc>,ascending=True, numPartitions=None)
                          • 8.repartition( )
                            • 9.coalesce( )
                              • 10.cache( )
                                • 11.persist( )
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档