前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark RDD篇

Spark RDD篇

作者头像
算法之名
发布2019-08-20 16:06:29
8620
发布2019-08-20 16:06:29
举报
文章被收录于专栏:算法之名

RDD是一个抽象,会记录一些信息,他并不是一个真正的集合,但可以像集合一样操作,降低了开发难度。

RDD的算子分为2类,一种是Transformation(lazy不会立即执行,即便有错误也不会发现),一类是Action(触发任务执行)

创建RDD的方式有3种。

1、通过外部的存储系统创建RDD(如hadoop hdfs,HBase,MongoDB)

2、将Driver的Scala集合通过并行化的方式变成RDD(测试时使用,生产环境不适用)

3、调用一个已经存在的RDD的Transformation,会生成一个新的RDD.

1之前已经有过介绍,见提交第一个Spark统计文件单词数程序,配合hadoop hdfs

2

Spark context Web UI available at http://192.168.5.182:4040

Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181112100219-0000).

Spark session available as 'spark'.

Welcome to

代码语言:txt
复制
   \_\_\_\_              \_\_
代码语言:txt
复制
  / \_\_/\_\_  \_\_\_ \_\_\_\_\_/ /\_\_
代码语言:txt
复制
 \_\ \/ \_ \/ \_ `/ \_\_/  '\_/
代码语言:txt
复制
/\_\_\_/ .\_\_/\\_,\_/\_/ /\_/\\_\   version 2.2.0
代码语言:txt
复制
   /\_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

scala> val arr = Array(1,2,3,4,5,6,7,8,9,10)

arr: ArrayInt = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> arr.map(_ * 10)

res0: ArrayInt = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

scala> val rdd = sc.parallelize(arr) //将集合转成RDD

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD0 at parallelize at <console>:26

scala> val rdds = rdd.map(_ * 10) //将每个元素乘以10形成一个新的RDD

rdds: org.apache.spark.rdd.RDDInt = MapPartitionsRDD1 at map at <console>:28

scala> rdds.collect //查看这个新的RDD,由于RDD并不是一个真正的集合,必须要经过一次从各个Worker收集才能查看数据

res3: ArrayInt = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

scala> val rdd3 = rdd.filter(_ % 2 == 0) //过滤出偶数的集合生成一个新的RDD

rdd3: org.apache.spark.rdd.RDDInt = MapPartitionsRDD2 at filter at <console>:28

scala> rdd3.collect

res4: ArrayInt = Array(2, 4, 6, 8, 10)

这个时候我们来看管理界面

我们点进去这个Spark shell

我们可以看到他进行了2次收集

一路点击进去我们可以看到任务是在哪些机器上执行的详细情况

RDD的算子

scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x,true) //将List集合每个元素乘以2后按照升序排序

rdd2: org.apache.spark.rdd.RDDInt = MapPartitionsRDD9 at sortBy at <console>:24

scala> rdd2.collect

res5: ArrayInt = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

我们可以看到他进行了排序和收集操作。

scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x + "",true) //按照字符串规则来排序,不会改变集合的元素类型,这里依然是Int型集合

rdd2: org.apache.spark.rdd.RDDInt = MapPartitionsRDD16 at sortBy at <console>:24

scala> rdd2.collect

res6: ArrayInt = Array(10, 12, 14, 16, 18, 2, 20, 4, 6, 8)

我们可以看到排序后是先比较第一位,再比较第二位来进行排序,即字符串规则排序的

scala> val arr = Array("a b c","d e f","h i j")

arr: ArrayString = Array(a b c, d e f, h i j)

scala> arr.map(_.split(" "))

res10: Array[ArrayString] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))

scala> arr.map(_.split(" ")).flatten //扁平化处理

res11: ArrayString = Array(a, b, c, d, e, f, h, i, j)

以上是集合操作

scala> val rdd4 = sc.parallelize(Array("a b c","d e f","h i j"))

rdd4: org.apache.spark.rdd.RDDString = ParallelCollectionRDD17 at parallelize at <console>:24

scala> rdd4.map(_.split(" ")).collect

res12: Array[ArrayString] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))

由于RDD没有flatten方法,只能使用flatMap方法进行扁平化处理

scala> rdd4.flatMap(_.split(" ")).collect

res13: ArrayString = Array(a, b, c, d, e, f, h, i, j)


scala> val rdd5 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i j","a a b")))

rdd5: org.apache.spark.rdd.RDD[ListString] = ParallelCollectionRDD21 at parallelize at <console>:24

scala> rdd5.flatMap(_.flatMap(_.split(" "))).collect //这两个flatMap不是一回事,一个是RDD的,他会把任务分发到各个计算服务器上进行计算;一个是List的,他只会在被分发到的计算服务器上进行计算

res14: ArrayString = Array(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b)


scala> val rdd6 = sc.parallelize(List(5,6,4,7))

rdd6: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD23 at parallelize at <console>:24

scala> val rdd7 = sc.parallelize(List(1,2,3,4))

rdd7: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD24 at parallelize at <console>:24

scala> rdd6.union(rdd7).collect

res15: ArrayInt = Array(5, 6, 4, 7, 1, 2, 3, 4) //并集

scala> rdd6.intersection(rdd7).collect

res16: ArrayInt = Array(4) //交集


scala> val rdd8 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) //创建一个对偶元组的List的RDD

rdd8: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD32 at parallelize at <console>:24

scala> val rdd9 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2)))

rdd9: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD33 at parallelize at <console>:24

scala> val rdd10 = rdd8.join(rdd9) //类似于SQL的inner join,只对对偶元组的Key为依据生效

rdd10: org.apache.spark.rdd.RDD(String, (Int, Int)) = MapPartitionsRDD36 at join at <console>:28

scala> rdd10.saveAsTextFile("hdfs://192.168.5.182:8020/testjoin") //将结果保存在hadoop hdfs里面

[Stage 17:> [Stage 19:> [Stage 19:==============================================

root@host2 bin# ./hdfs dfs -ls /testjoin

Found 17 items

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00000

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00001

-rw-r--r-- 3 root supergroup 24 2018-11-12 14:54 /testjoin/part-00002

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00003

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00004

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00005

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00006

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00007

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00008

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00009

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00010

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00011

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00012

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00013

-rw-r--r-- 3 root supergroup 14 2018-11-12 14:54 /testjoin/part-00014

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00015

root@host2 bin# ./hdfs dfs -cat /testjoin/part-00002

(tom,(1,2))

(tom,(1,8))

root@host2 bin# ./hdfs dfs -cat /testjoin/part-00014

(jerry,(2,9))

根据结果,只有tom和jerry被依据条件保留了下来

scala> val rdd11 = rdd8.leftOuterJoin(rdd9) //left join

rdd11: org.apache.spark.rdd.RDD[(String, (Int, OptionInt))] = MapPartitionsRDD40 at leftOuterJoin at <console>:28

scala> rdd11.saveAsTextFile("hdfs://192.168.5.182:8020/leftjointest")

root@host2 bin# ./hdfs dfs -ls /leftjointest

Found 17 items

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00000

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00001

-rw-r--r-- 3 root supergroup 36 2018-11-12 15:15 /leftjointest/part-00002

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00003

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00004

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00005

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00006

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00007

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00008

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00009

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00010

-rw-r--r-- 3 root supergroup 17 2018-11-12 15:15 /leftjointest/part-00011

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00012

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00013

-rw-r--r-- 3 root supergroup 20 2018-11-12 15:15 /leftjointest/part-00014

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00015

root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00002

(tom,(1,Some(8)))

(tom,(1,Some(2)))

root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00011

(kitty,(3,None))

root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00014

(jerry,(2,Some(9)))

rdd8的元素都被保留下来,rdd9中有相同的元素会被选出来。

scala> rdd11.collect

res18: Array[(String, (Int, OptionInt))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (kitty,(3,None)), (jerry,(2,Some(9))))

在Drive中直接查看结果,跟保存在hadoop hdfs中相同。


scala> val rdd12 = rdd8.union(rdd9)

rdd12: org.apache.spark.rdd.RDD(String, Int) = UnionRDD42 at union at <console>:28

scala> rdd12.collect

res20: Array(String, Int) = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))

scala> rdd12.groupByKey.collect //分组

res21: Array[(String, IterableInt)] = Array((tom,CompactBuffer(2, 1, 8)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))


现在我们不用reduceByKey来计算hadoop hdfs中/usr/file/a.txt中的WordCount,而使用groupByKey

scala> val wordAndOne = sc.textFile("hdfs://192.168.5.182:8020/usr/file/a.txt")

wordAndOne: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/a.txt MapPartitionsRDD45 at textFile at <console>:24

scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey.collect

res23: Array[(String, IterableInt)] = Array((him,CompactBuffer(1)), (park,CompactBuffer(1)), (fool,CompactBuffer(1)), (dinsh,CompactBuffer(1)), (fish,CompactBuffer(1)), (dog,CompactBuffer(1)), (apple,CompactBuffer(1)), (cry,CompactBuffer(1)), (my,CompactBuffer(1)), (ice,CompactBuffer(1)), (cark,CompactBuffer(1)), (balana,CompactBuffer(1)), (fuck,CompactBuffer(1)))

scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey().mapValues(_.sum).collect //mapValues对对偶元组的值进行操作,_.sum对每个值进行求和,这样得出的结果跟之前一样。

res24: Array(String, Int) = Array((him,1), (park,1), (fool,1), (dinsh,1), (fish,1), (dog,1), (apple,1), (cry,1), (my,1), (ice,1), (cark,1), (balana,1), (fuck,1))

虽然结果一样,但是在数据量大的时候,使用reduceByKey,因为reduceByKey会先在各个计算服务器上先计算,而groupByKey会把所有数据放入一台计算服务器中,再进行计算,这样消耗会非常大


scala> val rdd1 = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))

rdd1: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD0 at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))

rdd2: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD1 at parallelize at <console>:24

scala> val rdd3 = rdd1.cogroup(rdd2) //对对偶元组所在的集合的RDD进行操作,以Key为依据进行分组,获得一个新的对偶元组数组,对偶元组中,保留Key,而Value为每一个RDD中的Value集合组成的元组。

rdd3: org.apache.spark.rdd.RDD[(String, (IterableInt, IterableInt))] = MapPartitionsRDD3 at cogroup at <console>:28

scala> rdd3.collect

[Stage 0:> [Stage 0:> res0: Array[(String, (IterableInt, IterableInt))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())), (jerry,(CompactBuffer(3),CompactBuffer(2))))


scala> val rdd1 = sc.parallelize(List("tom","jerry"))

rdd1: org.apache.spark.rdd.RDDString = ParallelCollectionRDD4 at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List("tom","kitty","shuke"))

rdd2: org.apache.spark.rdd.RDDString = ParallelCollectionRDD5 at parallelize at <console>:24

scala> val rdd3 = rdd1.cartesian(rdd2) //求笛卡尔积

rdd3: org.apache.spark.rdd.RDD(String, String) = CartesianRDD6 at cartesian at <console>:28

scala> rdd3.collect

res1: Array(String, String) = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))


Action

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3) //并行化创建时指定3个分区

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD7 at parallelize at <console>:24

scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave")

root@host2 bin# ./hdfs dfs -ls /

Found 4 items

drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest

drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin

drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

root@host2 bin# ./hdfs dfs -ls /testsave

Found 4 items

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:07 /testsave/_SUCCESS

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:07 /testsave/part-00000

-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00001

-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00002

root@host2 bin# ./hdfs dfs -cat /testsave/part-00000

1

root@host2 bin# ./hdfs dfs -cat /testsave/part-00001

2

3

root@host2 bin# ./hdfs dfs -cat /testsave/part-00002

4

5

在Hadoop hdfs里,我们可以看到,他有3个part保存结果

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5)) //不指定分区

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD9 at parallelize at <console>:24

scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave1")

root@host2 bin# ./hdfs dfs -ls /

Found 5 items

drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest

drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin

drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave

drwxr-xr-x - root supergroup 0 2018-11-15 11:09 /testsave1

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

root@host2 bin# ./hdfs dfs -ls /testsave1

Found 17 items

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00000

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00001

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00002

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00003

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00004

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00005

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00006

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00007

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00008

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00009

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00010

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00011

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00012

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00013

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00014

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00015

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00003

1

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00006

2

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00009

3

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00012

4

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00015

5

不指定分区,我们可以看到有16个分区,这跟我们启动Spark-Shell时使用的核数有关系

root@host2 bin# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 1g --total-executor-cores 16

这里我使用的16核,1G内存来启动本次计算,值得注意的是这里并不是分区越大越好,分区较大,也只有16个线程同时工作,其他线程等待,而切换线程会浪费时间。


scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")

rdd: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD12 at textFile at <console>:24

root@host2 bin# ./hdfs dfs -ls /usr/file/wcount

Found 3 items

-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS

-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000

-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001

scala> rdd.partitions.length //查看RDD的分区数

res4: Int = 3

这里我们可以看到hadoop hdfs里/usr/file/wcount下面有3个文件,RDD的分区数则为3,如果我们上传一个新的文件进入该文件夹

root@host2 bin# ./hdfs dfs -put /home/soft/schema.xml /usr/file/wcount

root@host2 bin# ./hdfs dfs -ls /usr/file/wcount

Found 4 items

-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS

-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000

-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001

-rw-r--r-- 3 root supergroup 3320 2018-11-15 14:34 /usr/file/wcount/schema.xml

scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")

rdd: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD14 at textFile at <console>:24

scala> rdd.partitions.length

res5: Int = 4

则该RDD的分区数变成了4.


scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),2)

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD15 at parallelize at <console>:24

scala> rdd1.reduce(_+_)

res6: Int = 15

我们这里可以看到reduce没有返回一个RDD,而是直接返回了一个值,说明reduce()是一个Action算子

scala> rdd1.count

res7: Long = 5

集合包含的元素数量,也是一个Action算子

scala> rdd1.top(2)

res8: ArrayInt = Array(5, 4)

将元素进行排序,按照降序取最大的n个

scala> rdd1.take(2)

res9: ArrayInt = Array(1, 2)

取前n个元素,不排序

scala> rdd1.first

res10: Int = 1

取第一个元素

scala> rdd1.takeOrdered(3)

res11: ArrayInt = Array(1, 2, 3)

排序,按照升序,取前n个元素


scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD0 at parallelize at <console>:24

scala> val func = (index: Int,it: IteratorInt) => {

代码语言:txt
复制
  |       it.map(e => s"part: $index, ele: $e")
代码语言:txt
复制
  |     }

func: (Int, IteratorInt) => IteratorString = <function2>

定义一个专门获取集合数据e所在分区index的函数

scala> val rdd2 = rdd.mapPartitionsWithIndex(func) //一次性获取一个分区的集合数据,并且知道这个集合的数据在哪个分区

rdd2: org.apache.spark.rdd.RDDString = MapPartitionsRDD1 at mapPartitionsWithIndex at <console>:28

scala> rdd2.collect

res0: ArrayString = Array(part: 0, ele: 1, part: 0, ele: 2, part: 0, ele: 3, part: 0, ele: 4, part: 1, ele: 5, part: 1, ele: 6, part: 1, ele: 7, part: 1, ele: 8, part: 1, ele: 9)

1,2,3,4在0分区;5,6,7,8,9在1分区。

scala> rdd.aggregate(0)(_ + _,_ + _) //第一个_ + _表示在每个分区内各自相加(这里是2个分区),第二个_ + _表示再总求和(先分散,再聚合)

res6: Int = 45

scala> rdd.aggregate(0)(math.max(_,_),_ + _) //math.max(_,_)表示取各个分区的最大值,_ + _表示各个最大值相加

res7: Int = 13

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) //3个分区

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD6 at parallelize at <console>:24

scala> rdd.aggregate(0)(_ + _,_ + _) //3个分区分别相加,再汇总

res8: Int = 45

scala> rdd.aggregate(0)(math.max(_,_),_ + _) //3个分区的最大值相加,这里为3+6+9

res9: Int = 18

scala> rdd.aggregate(5)(math.max(_,_),_ + _)

res10: Int = 25

这里5作为一个值被加到各个分区做比较,第一个分区1,2,3都比5小,所以第一个分区最大值为5,第二个分区最大值为6,第三个分区最大值为9,5+6+9=20,同时5又作为一个单独分区被统加,所以这里是5+6+9+5=25


scala> val rdd = sc.parallelize(List("a","b","c","d","e","f"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD2 at parallelize at <console>:24

scala> def func2(index: Int, iter: IteratorString): IteratorString = {

代码语言:txt
复制
  |    iter.map(x => "[partID:" + index + ",val:" + x + "]")
代码语言:txt
复制
  | }

func2: (index: Int, iter: IteratorString)IteratorString

定义一个专门获取集合数据x所在分区index的函数

scala> val rdd1 = rdd.mapPartitionsWithIndex(func2) //一次性获取一个分区的集合数据,并且知道这个集合的数据在哪个分区

rdd1: org.apache.spark.rdd.RDDString = MapPartitionsRDD4 at mapPartitionsWithIndex at <console>:28

scala> rdd1.collect

res3: ArrayString = Array(partID:0,val:a, partID:0,val:b, partID:0,val:c, partID:1,val:d, partID:1,val:e, partID:1,val:f)

a,b,c在0分区;d,e,f在1分区

scala> rdd.aggregate("")(_ + _,_ + _)

res18: String = defabc

scala> rdd.aggregate("")(_ + _,_ + _)

res19: String = abcdef

这里出现了两个不同的结果,其原因就在于rdd有两个分区,而每个分区在worker里面的executor是并行计算的,他们返回到rdd的结果速度不一定,谁先返回,谁在前面。

scala> rdd.aggregate("|")(_ + _,_ + _)

res20: String = ||abc|def

scala> rdd.aggregate("|")(_ + _,_ + _)

res21: String = ||def|abc

这里也是出现了两个结果,原因同上,|被分配到每一个分区作为第一个字符被连接,同时|作为一个单独的分区被连接字符串。


scala> val rdd = sc.parallelize(List("12","23","345","4567"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD8 at parallelize at <console>:24

scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x + y)

res24: String = 24

scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x + y)

res25: String = 42

(x,y) => math.max(x.length,y.length).toString每一个分区取最大的字符串长度转成字符串,(x,y) => x + y所有分区结果字符串的拼接。第一个分区"12","23"的最大字符串长度为2,第二个分区"345","4567"的最大字符串长度为4.所以有两个结果,谁先返回谁在前面,返回的结果为"24"或者"42".


scala> val rdd = sc.parallelize(List("12","23","345",""),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD9 at parallelize at <console>:24

scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x + y)

res28: String = 01

scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x + y)

res29: String = 10

这个比较难以理解,第一个分区""跟"12"比较得到长度为"0"的字符串,然后"0"的字符串跟"23"比较,得到长度为"1"的字符串;第二个分区,""跟"345"比较得到"0"的字符串,"0"的字符串跟""比较得到"0"的字符串,所以返回的是"01"或者是"10",我们可以用下面这个rdd来验证。

scala> val rdd = sc.parallelize(List("12","23","345","67"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD10 at parallelize at <console>:24

scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x + y)

res30: String = 11

这里唯一的不同就在于"0"的字符串跟"67"比较得到"1"的字符串


scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)

pairRDD: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD11 at parallelize at <console>:24

scala> pairRDD.aggregateByKey(0)(_ + _,_ + _).collect //各个分区相加,再聚合相加

res33: Array(String, Int) = Array((dog,12), (cat,19), (mouse,6))

scala> pairRDD.aggregateByKey(100)(_ + _,_ + _).collect

res34: Array(String, Int) = Array((dog,112), (cat,219), (mouse,206))

初始值100,会在每个分区的都加一次,dog在第一个分区中没有,第二个分区中加得112;cat在第一个分区和第二个分区都有,所以100会加两次,得到219,mouse同理。

当然我们只是为了获取对偶元组key的value值的和,可以使用reduceByKey,这里不需要分区,结果跟初始值为0的aggregateByKey相同

scala> pairRDD.reduceByKey(_ + _).collect

res31: Array(String, Int) = Array((dog,12), (cat,19), (mouse,6))

scala> pairRDD.aggregateByKey(100)(_ + _,_ + _).saveAsTextFile("hdfs://192.168.5.182:8020/aggbk")

root@host2 bin# ./hdfs dfs -ls /aggbk

Found 3 items

-rw-r--r-- 3 root supergroup 0 2018-11-16 17:22 /aggbk/_SUCCESS

-rw-r--r-- 3 root supergroup 20 2018-11-16 17:22 /aggbk/part-00000

-rw-r--r-- 3 root supergroup 12 2018-11-16 17:22 /aggbk/part-00001

root@host2 bin# ./hdfs dfs -cat /aggbk/part-00000

(dog,112)

(cat,219)

root@host2 bin# ./hdfs dfs -cat /aggbk/part-00001

(mouse,206)

root@host2 bin#

将初始值100的结果保存进hadoop hdfs中,因为我们创建RDD的时候是2个分区,所以这里只有2个part文件,查看结果跟之前collect相同。


scala> val rdd = sc.parallelize(List(("a",1),("b",2)))

rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD18 at parallelize at <console>:24

scala> rdd.collectAsMap //将结果收集转换成Map

res36: scala.collection.MapString,Int = Map(b -> 2, a -> 1)

scala> rdd.mapValues(_ * 100).collectAsMap //将value乘以100,收集成Map

res37: scala.collection.MapString,Int = Map(b -> 200, a -> 100)


RDD的执行过程,先把List(1,2,3,4,5)分3个区,生成task,推送到3个Worker的Executor中,在Executor中经过计算,得到结果,再收集回Driver中,以数组的形式返回,返回的结果,有快有慢,但是他依然会按照分区编号来进行组装成一个Array,所以他的顺序并不会变化。

scala> val rdd = sc.parallelize(List(1,2,3,4,5),3)

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD2 at parallelize at <console>:24

scala> rdd.map(_ * 10).collect

res19: ArrayInt = Array(10, 20, 30, 40, 50)

scala> rdd.map(_ * 10).collect

res20: ArrayInt = Array(10, 20, 30, 40, 50)

scala> rdd.map(_ * 10).collect

res21: ArrayInt = Array(10, 20, 30, 40, 50)

这里无论执行多少次,顺序都不会变。

如果要将结果保存到数据库中,当数据量过大时,应该通过Executor直接写入数据库,而不是通过Driver收集再存入数据库。


scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1)))

rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD6 at parallelize at <console>:24

scala> rdd.countByKey() //跟对偶元组的Value无关,只看Key的出现次数

res22: scala.collection.MapString,Long = Map(a -> 1, b -> 2, c -> 2)


scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1),("d",4),("d",2),("e",1)))

rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD10 at parallelize at <console>:24

scala> val rdd1 = rdd.filterByRange("b","d") //以对偶数组的Key为过滤条件,只取"b"到"d"的范围的元组

rdd1: org.apache.spark.rdd.RDD(String, Int) = MapPartitionsRDD11 at filterByRange at <console>:26

scala> rdd1.collect

res24: Array(String, Int) = Array((b,2), (b,2), (c,2), (c,1), (d,4), (d,2))


scala> val a = sc.parallelize(List(("a","1 2"),("b","3 4")))

a: org.apache.spark.rdd.RDD(String, String) = ParallelCollectionRDD12 at parallelize at <console>:24

scala> a.flatMapValues(_.split(" ")).collect //对对偶元组的Value进行扁平化处理

res25: Array(String, String) = Array((a,1), (a,2), (b,3), (b,4))


scala> val rdd = sc.parallelize(List("dog","wolf","cat","bear"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD14 at parallelize at <console>:24

scala> val rdd1 = rdd.map(x => (x.length,x)) //将rdd的元素转成带对偶元组的集合,形成一个新的RDD的rdd1

rdd1: org.apache.spark.rdd.RDD(Int, String) = MapPartitionsRDD15 at map at <console>:26

scala> rdd1.collect

res26: Array(Int, String) = Array((3,dog), (4,wolf), (3,cat), (4,bear))

现在我们要将rdd1以相同的Key,将Value拼接起来,有以下三种方法

scala> rdd1.aggregateByKey("")(_ + _,_ + _).collect

res27: Array(Int, String) = Array((4,bearwolf), (3,dogcat))

scala> rdd1.aggregateByKey("")(_ + _,_ + _).collect

res28: Array(Int, String) = Array((4,wolfbear), (3,catdog))

scala> rdd1.reduceByKey(_ + _).collect

res40: Array(Int, String) = Array((4,bearwolf), (3,dogcat))

scala> rdd1.foldByKey("")(_ + _).collect

res41: Array(Int, String) = Array((4,bearwolf), (3,dogcat))

其实这3种方法都可以实现分散聚合,是因为他们都调用了同一个底层方法combineByKeyWithClassTag


scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD31 at parallelize at <console>:24

scala> rdd.foreach(e => println(e * 100))

这个foreach我们看到没有任何返回,其原因就在于这是在executor上执行的,并没有返回Driver.我们来看Spark的控制台

这里有一个Job Id为42的foreach,一直点进去可以看到

我们点击Tasks(2)的stdout可以看到当index为0时

当index为1时,可以看到

说明他们只是在executor中执行了rdd.foreach(e => println(e * 100))这条语句。

scala> rdd.foreachPartition(it => it.foreach(x => println(x * 10000))) //一次性拿出一个分区的数据放入迭代器,由迭代器来打印

我们可以看到这里也没有返回值,在Spark控制台中,可以看到

说明他也是在Executor中执行了该语句,并没有返回到Driver.

当我们要将Executor中的数据写入到数据库时,使用foreachPartition一次性拿出一个分区的数据,与数据库建立一次连接,就可以全部写进去,而使用foreach则需要每拿出一条数据就要与数据库建立一次连接,这样非常低效,而且消耗非常巨大。


scala> val pairRDD = sc.parallelize(List(("hello",2),("jerry",3),("hello",4),("jerry",1)),2)

pairRDD: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD1 at parallelize at <console>:24

scala> val rdd = pairRDD.combineByKey(x => x,(m: Int,n: Int) => m + n,(a: Int,b: Int) => a + b)

rdd: org.apache.spark.rdd.RDD(String, Int) = ShuffledRDD2 at combineByKey at <console>:26

combineByKey是一个底层的算子,必须要声明参数的类型,不能使用类似_ + _的写法;ShuffledRDD是把有相同的Key的对偶元组放到同一个Executor中,再进行运算。

scala> rdd.collect

res1: Array(String, Int) = Array((hello,6), (jerry,4))

我们来看一个把各种动物按照单双来进行分组的例子

scala> val rdd = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD0 at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(List(1,1,2,2,2,1,2,2,2),3)

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD1 at parallelize at <console>:24

scala> val rdd2 = rdd1.zip(rdd) //将两个RDD的集合合并成一个对偶元组的集合

rdd2: org.apache.spark.rdd.RDD(Int, String) = ZippedPartitionsRDD22 at zip at <console>:28

scala> rdd2.collect

res0: Array(Int, String) = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))

scala> import scala.collection.mutable.ListBuffer

import scala.collection.mutable.ListBuffer

scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBufferString,n: String) => m += n,(a: ListBufferString,b: ListBufferString) => a ++= b)

rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBufferString)] = ShuffledRDD4 at combineByKey at <console>:31

第一个函数x => ListBuffer(x)是将分好组的各种Key(这里Key为数字)的第一个Value(Value为动物)放进一个单独的ListBuffer中,比如第一个分区中只有ListBuffer(dog)和ListBuffer(gnu),没有cat,因为cat不是1的第一个Value,其他分区以此类推;第二个函数(m: ListBufferString,n: String) => m += n将没有放进ListBuffer中的其他Value放进有相同Key的ListBuffer中,比如第一个分区中有ListBuffer(dog,cat),ListBuffer(gnu),此时只是在各个分区分别操作;第三个函数(a: ListBufferString,b: ListBufferString) => a ++= b进行所有分区整体聚合,将所有相同Key的ListBuffer合并,此时是一个Shuffled操作,会将有相同Key的ListBuffer放入到同一个机器中,计算完再合并。

scala> rdd3.collect

res2: Array[(Int, scala.collection.mutable.ListBufferString)] = Array((1,ListBuffer(dog, cat, turkey)), (2,ListBuffer(salmon, rabbit, gnu, wolf, bear, bee)))

整体概念图如下

将结果保存到hadoop hdfs中

scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine")

root@host2 bin# ./hdfs dfs -ls /combine

Found 4 items

-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/part-00000

-rw-r--r-- 3 root supergroup 33 2018-11-23 17:14 /combine/part-00001

-rw-r--r-- 3 root supergroup 53 2018-11-23 17:14 /combine/part-00002

root@host2 bin# ./hdfs dfs -cat /combine/part-00001

(1,ListBuffer(turkey, dog, cat))

root@host2 bin# ./hdfs dfs -cat /combine/part-00002

(2,ListBuffer(gnu, wolf, bear, bee, salmon, rabbit))

虽然有3个分区,但是Shuffled以后,只有2个Key(1和2),所以只有两个文件有数据,但是有3个part文件。

我们可以重新定义rdd3的分区数

scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner

scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBufferString,n: String) => m += n,(a: ListBufferString,b: ListBufferString) => a ++= b,new HashPartitioner(2),true,null)

rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBufferString)] = ShuffledRDD6 at combineByKey at <console>:32

重新保存到hadoop hdfs中

scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine1")

root@host2 bin# ./hdfs dfs -ls /combine1

Found 3 items

-rw-r--r-- 3 root supergroup 0 2018-11-23 17:27 /combine1/_SUCCESS

-rw-r--r-- 3 root supergroup 53 2018-11-23 17:27 /combine1/part-00000

-rw-r--r-- 3 root supergroup 33 2018-11-23 17:27 /combine1/part-00001

此时可以看到新保存的结果只有2个part文件,并且都有数据。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MongoDB
腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档