前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Pyspark学习笔记(五)RDD操作(二)_RDD行动操作

Pyspark学习笔记(五)RDD操作(二)_RDD行动操作

作者头像
TeeyoHuang
发布2022-04-14 07:57:36
1.5K0
发布2022-04-14 07:57:36
举报

Pyspark学习笔记专栏系列文章目录

Pyspark学习笔记(一)—序言及目录

Pyspark学习笔记(二)— spark-submit命令

Pyspark学习笔记(三)— SparkContext 与 SparkSession

Pyspark学习笔记(四)弹性分布式数据集 RDD(上)

Pyspark学习笔记(四)弹性分布式数据集 RDD(下)

Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

Pyspark学习笔记(五)RDD操作(二)_RDD行动操作

文章目录

前言

提示:本篇博客讲的是RDD的操作中的行动操作,即 RDD Action

主要参考链接:

1.PySpark RDD Actions with examples

2.Apache spark python api

一、PySpark RDD 行动操作简介

    PySpark RDD行动操作(Actions) 是将值返回给驱动程序的 PySpark 操作.

行动操作会触发之前的转换操作进行执行。

即只有当程序遇到行动操作的时候,前面的RDD谱系中的一系列的转换操作才会运算,并将由行动操作得到最后的结果。

二.常见的转换操作表 & 使用例子

0.初始的示例rdd,

我们这里仍然以上一篇博文中的rdd_test作为示例,这样能更好的与之前讲的内容联系起来

代码语言:javascript
复制
[ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]

1.count()

该操作不接受参数,返回一个long类型值,代表rdd的元素个数

pyspark.RDD.count

正好测试一下 rdd_test 经过 map 和 flatMap 之后的不同之处

代码语言:javascript
复制
# the example of count
rdd_map_test = rdd_test.map(lambda x: x)
print("count_test1\n", rdd_map_test.count())
# out
1
代码语言:javascript
复制
# the example of count
rdd_flatmap_test = rdd_test.flatMap(lambda x: x)
print("count_test2\n", rdd_flatmap_test.count())
# out
5

分析如下:

map并不去掉嵌套,所以相当于列表中的元素是一个 (5,4) 二维的tuple;

而flatMap会去掉一层嵌套,则相当于5个(4,)一维的tuple

2.collect(<num>)

返回一个由RDD中所有元素组成的列表(没有限制输出数量,所以要注意RDD的大小) ;该行动操作就不用举例了,上一篇博文的转换操作的作用其实都是最后通过collect这个行动操作才显示出来的。

pyspark.RDD.collect

3.take(<num>)

返回RDD的前n个元素(无特定顺序)

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.take

代码语言:javascript
复制
# the example of take
print("take_test\n", rdd_flatmap_test.take(3))
代码语言:javascript
复制
[((10,1,2,3), (20,2,2,2), (20,1,2,3))]

4.takeOrdered(num, key=None)

从一个按照升序排列的RDD,或者按照key中提供的方法升序排列的RDD, 返回前n个元素

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.takeOrdered

代码语言:javascript
复制
# the example of takeOrdered
print("takeOrdered_test_1\n",flat_rdd_test.takeOrdered(3))
print("takeOrdered_test_1\n",flat_rdd_test.takeOrdered(3, key=lambda x:x[3]))
# out
[(10,1,2,3), (10,1,2,4), (10,1,2,4)] # 默认以子tuple元素的大小排序
[(20,2,2,2), (10,1,2,3), (20,1,2,3)] # 这时候就是以 子tuple元素的第[3]个位置的数字为顺序

5.takeSample(withReplacement, num, seed=None)

返回此 RDD 的固定大小的采样子集

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.takeSample

代码语言:javascript
复制
print("takeOrdered_test_1\n",flat_rdd_test.takeSample(False, 1, 1))
[(10,1,2,4)]

print("takeOrdered_test_1\n",flat_rdd_test.takeSample(False, 3, 1))
[(10,1,2,4), (20,1,2,3), (10,1,2,4)]

print("takeOrdered_test_1\n",flat_rdd_test.takeSample(False, 10, 1))
[(10,1,2,4), (20,1,2,3), (10,1,2,4), (20,2,2,2), (10,1,2,3)]

6.top(num, key=None)

返回RDD的前n个元素(按照降序输出, 排序方式由元素类型决定)

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.top

代码语言:javascript
复制
print("top_test\n",flat_rdd_test.top(3))
[(20,2,2,2), (20,1,2,3), (10,1,2,4)]

7.first()

返回RDD的第一个元素,也是不考虑元素顺序

pyspark.RDD.first

代码语言:javascript
复制
print("first_test\n",flat_rdd_test.first(3))
[(10,1,2,3)]

8.reduce(<func>)

使用指定的满足交换律/结合律的运算符来归约RDD中的所有元素;

处一般可以指定接收两个输入的 匿名函数<lambda x, y: …>;

pyspark.RDD.reduce

代码语言:javascript
复制
print("reduce_test\n",flat_rdd_test.reduce(lambda x, y: x+y))
[(10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)]

解释一下过程:

代码语言:javascript
复制
step_0: [(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]
step_1: (10,1,2,3) => x;   (10,1,2,4) => y
x + y => (10,1,2,3) + (10,1,2,4) => (10,1,2,3,10,1,2,4)

step_2: (10,1,2,3,10,1,2,4) => x;   (10,1,2,4) => y
x + y => (10,1,2,3,10,1,2,4) + (10,1,2,4) => (10,1,2,3,10,1,2,4,10,1,2,4)

step_3: (10,1,2,3,10,1,2,4,10,1,2,4) => x;  (20,2,2,2) => y
x + y => (10,1,2,3,10,1,2,4,10,1,2,4) + (20,2,2,2) => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2)

step_4: (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) => x;  (20,1,2,3) => y
x + y => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) + (20,1,2,3) =>
(10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)

9.foreach(<func>)

把具名或者匿名函数 ,应用到RDD的所有元素上.

和map类似,但是由于foreach是行动操作,所以可以执行一些输出类的函数,比如print操作

pyspark.RDD.foreach

10.countByValue()

将此 RDD 中每个唯一值的计数作为 (unique_value, count) 对的字典返回.

pyspark.RDD.countByValue

代码语言:javascript
复制
print("top_test\n",flat_rdd_test.countByValue().items() )
[((10,1,2,3),1), ((20,1,2,3),1), ((20,2,2,2),1), ((10,1,2,4),2)]

11.fold(zeroValue, func)

使用给定的func和 初始值zeroV把RDD中的每个分区的元素聚合,然后把每个分区聚合结果再聚合;

聚合的过程其实和reduce类似,但是不满足交换律

这里有个细节要注意,fold是对每个分区(each partition)都会应用 zeroValue 进行聚合,而不是只使用一次

代码语言:javascript
复制
'''
① 在每个节点应用fold:初始值zeroValue + 分区内RDD元素
② 获得各个partition的聚合值之后,对这些值再进行一次聚合,同样也应用zeroValue;
③ 则结果应为:zeroValue * (partition_num + 1) + RDD元素聚合值
'''

示例如下:

代码语言:javascript
复制
rdd_2 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 1)
rdd_3 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 4)
print('fold_test_2', rdd_2.fold('zeroV$_', lambda x,y: x+y))
print('fold_test_3', rdd_3.fold('zeroV$_', lambda x,y: x+y))

rdd2的分区是1,则初始值只会出现2次:

代码语言:javascript
复制
'ZeroV$_ZeroV$_A_a#B_b#C_c#D_d#'

rdd3的分区是4,则初始值会出现5次:

代码语言:javascript
复制
'ZeroV$_ZeroV$_A_a#zeroV$_B_b#zeroV$_C_c#zeroV$_D_d#'

再对flat_rdd进行一次实验:

代码语言:javascript
复制
print("fold_test", flat_rdd_test.repartition(1).fold(('Hello','World'), lambda x,y: x+y))

('Hello','World','Hello','World',10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)

12.aggregate(zeroValue, seqOp, combOp)

使用给定的函数和初始值,对每个分区的聚合进行聚合

(这里同样是对每个分区,初始值的使用规则和fold是一样的,对每个分区都采用)

seqOp方法是先对每个分区操作,然后combOp对每个分区的聚合结果进行最终聚合

代码语言:javascript
复制
rdd_agg_test = spark.sparkContext.parallelize([1,2,3,10,20,30,7,8,9],3)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

result_rdd = rdd_agg_test.aggregate((100,1000),seqOp,combOp)

(490, 4009)
代码语言:javascript
复制
seqOp :
partition_1: 100 + 1 + 2 + 3, 1000 + 1 + 1 + 1  => 106, 1003
partition_2: 100 + 10 + 20 + 30, 1000 + 1 + 1 + 1  => 160, 1003
partition_3: 100 + 7 + 8 + 9, 1000 + 1 + 1+ 1 => 124, 1003
combOp :
100+106+160+124, 1000+1003+1003+1003 => (490, 4009)

至此,行动操作的常用方法都基本介绍了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pyspark学习笔记专栏系列文章目录
  • Pyspark学习笔记(五)RDD操作(二)_RDD行动操作
    • 文章目录
    • 前言
    • 主要参考链接:
    • 一、PySpark RDD 行动操作简介
    • 二.常见的转换操作表 & 使用例子
      • 0.初始的示例rdd,
        • 1.count()
          • 2.collect(<num>)
            • 3.take(<num>)
              • 4.takeOrdered(num, key=None)
                • 5.takeSample(withReplacement, num, seed=None)
                  • 6.top(num, key=None)
                    • 7.first()
                      • 8.reduce(<func>)
                        • 9.foreach(<func>)
                          • 10.countByValue()
                            • 11.fold(zeroValue, func)
                              • 12.aggregate(zeroValue, seqOp, combOp)
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档