🚀 作者 :“大数据小禅” 🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容 🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪
RDD 中的算子从功能上分为两大类
特点
转换算子是spark中的一种操作,用于从一个RDD转换成另一个RDD,它可以被用来创建新的RDD,也可以被用来转换已有的RDD。它们提供了一种通用的方法来完成RDD的转换,如map、filter、groupByKey等。
行动算子是spark中的另一种操作,它们用于从一个RDD中收集数据,或者从一个RDD中计算结果,如collect、reduce、count等。行动算子可以基于RDD的转换算子的结果来进行计算,也可以基于一组RDD来进行计算。
总之,转换算子和行动算子之间有着紧密的联系,转换算子用于创建RDD,行动算子用于从RDD中收集数据和计算结果。
def mapTest(): Unit ={
val value = sc.parallelize(List(1, 2, 3, 4)).map(
value=>value*2
).collect().foreach(println(_))
val works = sc.parallelize(List("yo", "pai", "xc")).map(
work => (work, 1)
).collect().take(2).foreach(println(_))
}
@Test
def flatmapTest(): Unit ={
val works = sc.parallelize(List("yo", "pai", "xc")).flatMap(
work=>(work)
).collect().foreach(println(_))
}
@Test
def filterTest(): Unit ={
val works = sc.parallelize(List("yo", "pai", "xc")).filter(
//删选出不包含yo字段的
work=>(!work.contains("yo"))
).collect().foreach(println(_))
}
@Test
def mapPartitionsTest(): Unit ={
val works = sc.parallelize(List(1,2,3,4,5)).mapPartitions(
//里面接收一个函数,函数里面接收一个集合,传递一个集合要求返回一个集合
//函数里面要求接收一个集合,并且把集合里的Iterator[T]每一条数据转换之后再返回一个集合回去
//Iterator[T] => Iterator[U]
item=>{
val ele=item.map(item=>item*10)
ele
}
)
println(works.collect().mkString(","))
}
@Test
def reduceByKeyTest(): Unit ={
val works = sc.parallelize(Seq(("勇哥", 100), ("勇哥", 98), ("小明", 97))).reduceByKey(
(a,b)=>a+b
)
println(works.collect().foreach(println(_)))
}
//从本地集合创建RDD
val rdd = sc.parallelize(Seq(("勇哥", 100), ("小红", 98), ("小明", 97), ("小明", 77))).groupByKey()
println(rdd.collect().foreach(println(_)))
}
@Test
def sampleTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4,5,6,7)).sample(
//随机抽3个数字
withReplacement = true,2
)
println(rdd.collect().foreach(println(_)))
}
@Test
def sortByTest(){
val rdd = sc.parallelize(Array(("Tom",90),("Bob",70),("John",80))).sortBy(_._2)
println(rdd.collect().mkString(" "))
}
@Test
def distinctTest(){
val rdd = sc.parallelize(List(1,2,3,4,4,5,6,6))
val distinctRDD = rdd.distinct()
println(distinctRDD.collect().mkString(" "))
}
@Test
def unionTest(){
val rdd1 = sc.parallelize(List(1,2,3))
val rdd2 = sc.parallelize(List(1,2,3))
val rdd = rdd1.union(rdd2)
println(rdd.collect().mkString(" "))
}
@Test
def foldByKeyTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(Seq(("勇哥", 1), ("小明", 1), ("小明", 1))).foldByKey(zeroValue = 20)((a,b)=>(a+b))
println(rdd.collect().foreach(println(_)))
}
@Test
def subtractTest(){
val rdd1 = sc.parallelize(Seq(1,2,3,4,5))
val rdd2 = sc.parallelize(Seq(3,4,5,6,7))
val rdd3 = rdd1.subtract(rdd2)
println(rdd3.collect().foreach(println(_)))
}
@Test
def joinTest(){
val rdd1 = sc.makeRDD(Array((1, "yo"), (2, "xc"), (3, "yong")))
val rdd2= sc.makeRDD(Array((1, "yo1"), (2, "xc1"), (3, "yong1")))
val rdd3=rdd1.join(rdd2)
println(rdd3.collect().mkString(" "))
}
@Test
def reduceTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4)).reduce(_+_)
println(rdd)
}
@Test
def collcetTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4))
println(rdd.collect().mkString(" "))
}
@Test
def countTest(){
//从本地集合创建RDD
val rdd = sc.parallelize(List(1,2,3,4))
println(rdd.count())
}
@Test
def takeTest(){
//从本地集合创建RDD
val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4))
println(rdd.take(2).mkString(" "))
}
@Test
def foreachTest(){
//从本地集合创建RDD
val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4))
println(rdd.take(2).foreach(println(_)))
}
到这里spark的常用算子就总结完了,其实在Spark还有很多不同的算子本篇列举了一些日常开发中会比较常用的一些操作。