前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Shell笔记

Spark Shell笔记

作者头像
CBeann
发布2023-12-25 17:13:42
1600
发布2023-12-25 17:13:42
举报
文章被收录于专栏:CBeann的博客CBeann的博客

学习感悟

(1)学习一定要敲,感觉很简单,但是也要敲一敲,不要眼高手低

(2)一定要懂函数式编程,一定,一定

(3)shell中的方法在scala写的项目中也会有对应的方法

(4)sc和spark是程序的入口,直接用

SparkShell

启动SparkShell

代码语言:javascript
复制
 ./bin/spark-shell

WordCount案例

代码语言:javascript
复制
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/out")

RDD创建(Shell)

从集合中创建RDD

parallelize和makeRDD

代码语言:javascript
复制
val rdd1246 = sc.parallelize(List("a","b","c"))
代码语言:javascript
复制
rdd1246.collect
代码语言:javascript
复制
val rdd1617=sc.makeRDD(List(1,List(("a","b","c")),(2,List("d","e","f"))))
代码语言:javascript
复制
 rdd1617.collect
从外部存储创建RDD

由外部存储系统的数据集创建,包括本地文件系统,还有Hadoop支持的数据集,如HDFS,HBase

代码语言:javascript
复制
sc.textFile("hdfs://iZm5ea99qngm2v98asii1aZ:9000/README.txt")
从其他RDD转换

常用的Transformation和Action(Shell)

map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

代码语言:javascript
复制
scala> var rdd1638  = sc.parallelize(1 to 10)

scala> rdd1638.collect

scala> rdd1638.map(_*2).collect

filter(func):返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

代码语言:javascript
复制
scala> var rdd1643 =sc.parallelize(1 to 10)

scala> rdd1643.filter(_>5).collect

flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

注意:func 必须是将一个数据映射为0或多个输出元素

通俗点说:一个数据通过func函数产生的集合压平

代码语言:javascript
复制
val rdd3=sc.makeRDD(List("hello1_hello2_hello3","hello4_hello5"))

scala> rdd3.flatMap(_.split("_")).collect

sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽 出的数据是否放回,true 为有放回的抽样, false 为无放回的抽样,seed 用于指定随机 数生成器种子。例子从 RDD 中随机且有放 回的抽出 50%的数据,随机种子值为 3(即 可能以 1 2 3 的其中一个起始值)

代码语言:javascript
复制
scala> val rdd5 = sc.makeRDD(List(1,2,3,4,5,6,7))

scala> rdd5.sample(false,0.2,3).collect

takeSample:和 Sample 的区别是:takeSample 返回的是最终的结果集合。

union(otherDataset):对源 RDD 和参数 RDD 求并集后返回一个 新的 RDD

intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD

intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个 新的 RDD

distinct([numTasks])):对源 RDD 进行去重后返回一个新的 RDD. 默认情况下,只有 8 个并行任务来操作, 但是可以传入一个可选的 numTasks 参数 改变它。

代码语言:javascript
复制
rdd3 = sc.makeRDD(List(1,1,2,3,4,4,5))

rdd3.distinct(2).collect

reduceByKey(func, [numTasks]):在一个(K,V)的 RDD 上调用,返回一个 (K,V)的 RDD,使用指定的 reduce 函数, 将相同 key 的值聚合到一起,reduce 任务 的个数可以通过第二个可选的参数来设置

groupByKey:groupByKey 也是对每个 key 进行操作,但只生成 一个 sequence。

sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序 的(K,V)的 RDD

sortBy(func,[ascending], [numTasks]):与 sortByKey 类似,但是更灵活,可以用 func 先对数据进行处理,按照处理后的数 据比较结果排序。

join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个相同 key 对应的所有元素对在一起 的(K,(V,W))的 RDD

cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返 回一个(K,(Iterable,Iterable))类型 的 RDD

cartesian(otherDataset):笛卡尔积

coalesce(numPartitions):缩减分区数,用于大数据集过滤后,提高 小数据集的执行效率。

repartition(numPartitions):根据分区数,从新通过网络随机洗牌所有 数据。

glom:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]

subtract:计算差的一种函数去除两个 RDD 中相同的 元素,不同的 RDD 将保留下来

mapValues:针对于(K,V)形式的类型只对 V 进行操作

reduce(func):通过 func 函数聚集 RDD 中的所有元素, 这个功能必须是可交换且可并联的

collect():在驱动程序中,以数组的形式返回数据 集的所有元素

count():返回 RDD 的元素个数

first():返回 RDD 的第一个元素(类似于 take(1))

take(n);返回一个由数据集的前 n 个元素组成的 数组

takeOrdered(n):返回前几个的排序

saveAsTextFile(path):将数据集的元素以 textfile 的形式保存 到 HDFS 文件系统或者其他支持的文件 系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文 本

saveAsSequenceFile(path):将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录 下,可以使 HDFS 或者其他 Hadoop 支 持的文件系统。

saveAsObjectFile(path):用于将 RDD 中的元素序列化成对象, 存储到文件中。

countByKey();针对(K,V)类型的 RDD,返回一个 (K,Int)的 map,表示每一个 key 对应的 元素个数。

数据读取与保存主要方式(Shell)

文本文件输入输出
代码语言:javascript
复制
val rdd1 =sc.textFile("hdfs://Master:9000/cbeann/README.txt")
代码语言:javascript
复制
 rdd.saveAsTextFile("hdfs://Master:9000/cbeann/README2.txt")
JSON 、CSV文件输入输出(Shell)

先通过文本文件读入,然后通过fastjson等第三方库解析字符串为自定义的类型

先将自定义的类型通过第三方库转换为字符串,在同文本文件的形式保存到RDD中

SequenceFile 文件输入输出(Shell)

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的 一种平面文件(Flat File)。

代码语言:javascript
复制
 val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
代码语言:javascript
复制
data.saveAsSequenceFile("hdfs://Master:9000/cbeann/seq")
代码语言:javascript
复制
val sdata = sc.sequenceFile[Int,String]("hdfs://Master:9000/cbeann/seq/p*")
对象文件输入输出(Shell)
代码语言:javascript
复制
val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
代码语言:javascript
复制
data.saveAsObjectFile("hdfs://master01:9000/objfile")
代码语言:javascript
复制
val objrdd:RDD[(Int,String)] = sc.objectFile[(Int,String)]("hdfs://master01:9000/objfile/p*")

Spark SQL(Shell)

启动SparkShell

代码语言:javascript
复制
./bin/spark-shell

读取数据,创建DataFrame

我的hdfs上/cbeann/person.json

代码语言:javascript
复制
{  "name": "王小二",   "age": 15}
{  "name": "王小三",   "age": 25}
{  "name": "王小四",   "age": 35}
代码语言:javascript
复制
 val df = spark.read.json("hdfs://Master:9000/cbeann/person.json")
代码语言:javascript
复制
df.show

将数据注册一张表,表名为 people

代码语言:javascript
复制
df.createOrReplaceTempView("people")

发送SQL

代码语言:javascript
复制
spark.sql("select * from people where age > 16").show

或者

RDD、DataFrame、DataSet之间的转化(Shell)

RDD-》DataFrame
代码语言:javascript
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lisi",13)))
rdd.toDF("name","age").show

或者 

代码语言:javascript
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
case class Person(name:String, age:Int)
 val df =  rdd.map(x=>Person(x._1,x._2.toInt)).toDF
DataFrame-》RDD
代码语言:javascript
复制
val rdd1 = df.rdd
RDD-》DataSet
代码语言:javascript
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
代码语言:javascript
复制
val ds = rdd.toDS

或者

代码语言:javascript
复制
val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))
代码语言:javascript
复制
case class Person(name:String, age:Int)
代码语言:javascript
复制
 rdd.map(x=>Person(x._1,x._2.toInt)).toDS
DataSet-》RDD
代码语言:javascript
复制
ds.rdd
DataFrame》DataSet
代码语言:javascript
复制
scala> val rdd = sc.makeRDD(List(("zhangsan",11),("lsi",12),("wanwu",16)))

scala> val df = rdd.toDF("name","age")

scala> case class Person(name:String, age:Int)

scala> val ds = df.as[Person]

scala> ds.collect
DataSet-》DataFrame
代码语言:javascript
复制
ds.toDF

SparkSQl输入输出(Shell)

代码语言:javascript
复制
val personDF= spark.read.format("json").load("hdfs://Master:9000/cbeann/person.json")

等价于 

代码语言:javascript
复制
 val personDF1= spark.read.json("hdfs://Master:9000/cbeann/person.json")

相同的用法还有parquet,csv,text,jdbc

代码语言:javascript
复制
personDF1.write.format("json").save("hdfs://Master:9000/cbeann/person")

等价于与

代码语言:javascript
复制
personDF1.write.json("hdfs://Master:9000/cbeann/person1")

 相同的用法还有parquet,csv,text,jdbc

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 学习感悟
  • SparkShell
  • RDD创建(Shell)
    • 从集合中创建RDD
      • 从外部存储创建RDD
        • 从其他RDD转换
        • 常用的Transformation和Action(Shell)
        • 数据读取与保存主要方式(Shell)
          • 文本文件输入输出
            • JSON 、CSV文件输入输出(Shell)
              • SequenceFile 文件输入输出(Shell)
                • 对象文件输入输出(Shell)
                • Spark SQL(Shell)
                • RDD、DataFrame、DataSet之间的转化(Shell)
                  • RDD-》DataFrame
                    • DataFrame-》RDD
                      • RDD-》DataSet
                        • DataSet-》RDD
                          • DataFrame》DataSet
                            • DataSet-》DataFrame
                            • SparkSQl输入输出(Shell)
                            相关产品与服务
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档