//使用一下命令进入shell
//cd /usr/local/spark/bin
//./spark-shell
scala> val lines = sc.textFile("/home/hadoop/look.sh")
lines: org.apache.spark.rdd.RDD[String] = /home/hadoop/look.sh MapPartitionsRDD[1] at textFile at <console>:24
通常使用parallelize()函数可以创建一个简单的RDD,测试用(为了方便观察结果)。
scala> val rdd = sc.parallelize(Array(1,2,2,4),4) 最后一个4指的是并行度,默认是1
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd.count()
res0: Long = 4
scala> rdd.foreach(print)
1224
scala> rdd.foreach(print)
1422
注意:上述parallelize()方法就是为了观察数组的作用
还有如果集群节点大于一个,由于rdd的分片计算特性,会使两次的遍历结果并不相同
scala> val lines = sc.parallelize(Array("home Tom","hadoop Jack","look Kim"))
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> lines.foreach(println)
home Tom
hadoop Jack
look Kim
//注意对RDD本身的操作不影响其本身,因为是val定义的常量
scala> lines.flatMap(t=>t.split(" "))
res20: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:27
scala> lines.foreach(println)
home Tom
hadoop Jack
look Kim
//必须使用新的常量来接收
scala> val newrdd = lines.flatMap(t=>t.split(" "))
newrdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at flatMap at <console>:26
scala> newrdd.foreach(println)
home
Tom
hadoop
Jack
look
Kim
scala>
scala> val rdd1 = sc.parallelize(Array("one","two","three"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(Array("two","three","three"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> rdd2.distinct().foreach(println)
two
three
scala> rdd2.union(rdd1).foreach(println)
two
three
three
one
two
three
scala> rdd2.intersection(rdd1).foreach(println)
two
three
scala> rdd2.subtract(rdd1).foreach(println)
scala> rdd1.subtract(rdd2).foreach(println)
one
级别 | 空间占用 | CPU消耗 | 是否在内存 | 是否在硬盘 |
---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 在 | 不在 |
MEMORY_ONLY_SER | 低 | 高 | 在 | 不在 |
DISK_ONLY | 低 | 高 | 不在 | 在 |
MEMORY_AND_DISK | 高 | 中 | Some | Some |
MEMORY_AND_DISK_SER | 低 | 高 | Some | Some |
MEMORY_AND_DISK 内存中放不下往硬盘放 MEMORY_AND_DISK_SER 内存中放不下往硬盘放(序列化的,故CPU消耗较大)
scala> val lines = sc.textFile("/home/hadoop/look.sh")//注意这是错的,这样默认是取hdfs文件
scala> val lines = sc.textFile("file:///home/hadoop/look.sh")//用file://来指明取的系统文件
lines: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/look.sh MapPartitionsRDD[5] at textFile at <console>:24
scala> lines.foreach(println)
#!/bin/bash
Jarinfo=$(ps -ef|grep java)
echo "$Jarinfo" | while read Line
do
#echo $Line;
#echo ${Line##*:}
Jarstr=${Line##*:}
Ishere=$(echo $Jarstr | grep $1 )
if [[ "$Ishere" != "" ]]
then
echo YES 1>
exit 1
fi
done
scala> val pairs = lines.map(line=>(line.split(" ")(0),line))
pairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[6] at map at <console>:26
scala> pairs.foreach(println)
(#!/bin/bash,#!/bin/bash)
(Jarinfo=$(ps,Jarinfo=$(ps -ef|grep java))
(echo,echo "$Jarinfo" | while read Line)
(do,do )
( #echo, #echo $Line;)
( #echo, #echo ${Line##*:})
( Jarstr=${Line##*:}, Jarstr=${Line##*:})
( Ishere=$(echo, Ishere=$(echo $Jarstr | grep $1 ))
( if, if [[ "$Ishere" != "" ]])
( then, then)
( echo, echo YES 1>)
( exit, exit 1 )
( fi, fi)
(done,done)
函数名 | 作用 |
---|---|
reduceByKey(func) | 把相同key的value进行结合,key不变,是计算 |
groupByKey(func) | 把相同key的value进行分组,key不变,仅分组 |
combineByKey(,,,) | |
mapValues(func) | 将map操作作用于Values,进对Values进行操作 |
flatMapValues(func) | 将flatMap(扩展)操作作用于Values |
keys | 仅返回键的值(RDD.keys) |
values | 仅返回值的值(RDD.values) |
sortBtKey() | 按照Key来排序 |
scala> var rdd = sc.parallelize(Array((1,2),(3,4),(3,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.foreach(println)
(1,2)
(3,4)
(3,6)
scala> rdd.reduceByKey((x,y)=>x+y).foreach(println)
(1,2)
(3,10)
scala> rdd.groupByKey().foreach(println)
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))
scala> rdd.sortByKey().foreach(println)
(1,2)
(3,4)
(3,6)
scala> rdd.keys.foreach(println)
1
3
3
scala> rdd.values.foreach(println)
2
4
6
特点:最常用的基于key的聚合函数,返回的类型可以与输入的类型不一样 参数:createCombiner,mergeValue,mergeCombiners,partitioner 应用:许多基于key的聚合函数都用到了,例如groupByKey底层就应用到了 注意: