前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkSql之DataFrame

SparkSql之DataFrame

作者头像
用户1483438
发布2022-07-26 21:37:55
5740
发布2022-07-26 21:37:55
举报
文章被收录于专栏:大数据共享大数据共享

DataFrame

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。 DataFrame只知道每一列的类型是什么,每一行的类型是不知道的,不管每一行

创建SparkSession

代码语言:javascript
复制
  val sparkSession =SparkSession.builder().master("local[4]").appName("test").getOrCreate()

创建样例类

代码语言:javascript
复制
case class Student(id:Int,name:String,age:Int,sex:String)

使用toDF必须进行隐式转换

代码语言:javascript
复制
import sparkSession.implicits._

为了方便测试,单独把sparkSession 提出去,使用它 Junit的方式进行测试运行。

代码语言:javascript
复制
  @Test
  def demo01: Unit ={
    // 数据准备
    val list=List(
      Student(1,"张三",18,"男"),
      Student(2,"绣花",16,"女"),
      Student(3,"李四",18,"男"),
      Student(4,"王五",18,"男"),
      Student(5,"翠花",19,"女"),
      Student(6,"张鹏",17,"男")
    )
    // 使用`toDF`必须进行隐式转换
    import sparkSession.implicits._

    val df: DataFrame = list.toDF()
    //  执行,类似于 select * from  table;
    df.show()

  }

API参考:https://blog.csdn.net/dabokele/article/details/52802150

DataFrame对象上Action操作

show:展示数据

  1. show() 展示所有数据
代码语言:javascript
复制
val df: DataFrame = list.toDF()
df.show()
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  2|绣花| 16| 女|
|  3|李四| 18| 男|
|  4|王五| 18| 男|
|  5|翠花| 19| 女|
|  6|张鹏| 17| 男|
+---+----+---+---+
  1. show(numRows: Int) 展示指定条数数据
代码语言:javascript
复制
val df: DataFrame = list.toDF()
df.show(2)
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  2|绣花| 16| 女|
+---+----+---+---+
  1. show(truncate: Boolean):是否截断长字符串。如果为 true,超过 20 个字符的字符串将被截断,所有单元格将右对齐
  2. show(numRows: Int, truncate: Boolean):展示指定条数数据并指定是否截断长字符串。
  3. show(numRows: Int, truncate: Int):numRows展示条数,truncat<=0 左对齐,truncate>0 右对齐
  4. show(numRows: Int, truncate: Int, vertical: Boolean):vertical 如果设置为 true,则垂直打印输出行(每列值一行)。

collect:获取所有数据到数组 不同于前面的show方法,这里的collect方法会将jdbcDF中的所有数据都获取到,并返回一个Array对象。

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val rows: Array[Row] = df.collect()
    rows.foreach(println(_))
代码语言:javascript
复制
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]

collectAsList:获取所有数据到List 功能和collect类似,只不过将返回结构变成了List对象,使用方法如下

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val rows: util.List[Row] = df.collectAsList()
    rows.forEach(println(_))
代码语言:javascript
复制
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
[4,王五,18,男]
[5,翠花,19,女]
[6,张鹏,17,男]

describe(cols: String*):获取指定字段的统计信息 这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val frame: DataFrame = df.describe("name", "age")
    frame.show()
代码语言:javascript
复制
+-------+----+------------------+
|summary|name|               age|
+-------+----+------------------+
|  count|   6|                 6|
|   mean|null|17.666666666666668|
| stddev|null|1.0327955589886446|
|    min|张三|                16|
|    max|翠花|                19|
+-------+----+------------------+

first, head, take, takeAsList:获取若干行记录 这里列出的四个方法比较类似,其中

  1. first获取第一行记录
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val row: Row = df.first()
    println(row)
代码语言:javascript
复制
[1,张三,18,男]
  1. head获取第一行记录,head(n: Int)获取前n行记录
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val row: Row = df.head()
    println(row)
代码语言:javascript
复制
[1,张三,18,男]

取前n行记录

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val rows: Array[Row] = df.head(3)
    rows.foreach(println(_))
代码语言:javascript
复制
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
  1. take(n: Int)获取前n行数据
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val rows: Array[Row] = df.take(3)
    rows.foreach(println(_))
代码语言:javascript
复制
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]
  1. takeAsList(n: Int)获取前n行数据,并以List的形式展现
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val rows: util.List[Row] = df.takeAsList(3)
    rows.forEach(println(_))
代码语言:javascript
复制
[1,张三,18,男]
[2,绣花,16,女]
[3,李四,18,男]

以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。

take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError

DataFrame对象上的条件查询和join等操作

新增一些数据

代码语言:javascript
复制
val list=List(
      Student(1,"张三",18,"男"),
      Student(2,"绣花",16,"女"),
      Student(3,"李四",18,"男"),
      Student(4,"王五",18,"男"),
      Student(5,"翠花",19,"女"),
      Student(7,"张鹏",14,"男"),
      Student(8,"刘秀",13,"男"),
      Student(9,"王菲菲",20,"女"),
      Student(10,"乐乐",21,"男"),
      Student(11,"小惠",23,"女"),
      Student(12,"梦雅",25,"女"),
)

where条件相关

where(conditionExpr: String):SQL语言中where关键字后的条件 传入筛选条件表达式,可以用andor。得到DataFrame类型的返回结果,

查询性别为的学生信息

代码语言:javascript
复制
    import sparkSession.implicits._

    val df: DataFrame = list.toDF()
    df.where("sex='男'").show()
代码语言:javascript
复制
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  3|李四| 18| 男|
|  4|王五| 18| 男|
|  7|张鹏| 14| 男|
|  8|刘秀| 13| 男|
| 10|乐乐| 21| 男|
+---+----+---+---+

and 查询年龄小于18岁,并且性别为的学生信息

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.where("age<18 and sex='女'").show()
代码语言:javascript
复制
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  2|绣花| 16| 女|
+---+----+---+---+

or 查询年龄>18 或者性别为的学生信息

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.where("age>18 or sex='女'").show()
代码语言:javascript
复制
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  2|  绣花| 16| 女|
|  5|  翠花| 19| 女|
|  9|王菲菲| 20| 女|
| 10|  乐乐| 21| 男|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

filter:根据字段进行筛选 传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同

查询性别不为的学生信息

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.filter("sex!='男'").show()
代码语言:javascript
复制
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  2|  绣花| 16| 女|
|  5|  翠花| 19| 女|
|  9|王菲菲| 20| 女|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

filter中也可以使用orand

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.filter("sex!='男' and age >20").show()
代码语言:javascript
复制
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
| 11|小惠| 23| 女|
| 12|梦雅| 25| 女|
+---+----+---+---+

查询指定字段

select:获取指定字段值

  1. select(cols: Column*) :根据传入的String类型字段名,获取指定字段的值,以DataFrame类型返回
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.select("name","age").show()
代码语言:javascript
复制
+------+---+
|  name|age|
+------+---+
|  张三| 18|
|  绣花| 16|
|  李四| 18|
|  王五| 18|
|  翠花| 19|
|  张鹏| 14|
|  刘秀| 13|
|王菲菲| 20|
|  乐乐| 21|
|  小惠| 23|
|  梦雅| 25|
+------+---+

还有一个重载的select方法,不是传入String类型参数,而是传入Column类型参数。可以实现select id, id+1 from test这种逻辑。

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.select(df("id"),df("id")+1).show()
代码语言:javascript
复制
+---+--------+
| id|(id + 1)|
+---+--------+
|  1|       2|
|  2|       3|
|  3|       4|
|  4|       5|
|  5|       6|
|  7|       8|
|  8|       9|
|  9|      10|
| 10|      11|
| 11|      12|
| 12|      13|
+---+--------+

selectExpr:可以对指定字段进行特殊处理 可以直接对指定字段调用UDF函数,或者指定别名等。传入String类型参数,得到DataFrame对象。

获取年龄最大的学生信息

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.selectExpr(
      """
        |max(age) max_age
        |""".stripMargin).show()
代码语言:javascript
复制
+-------+
|max_age|
+-------+
|     25|
+-------+

col:获取指定字段   只能获取一个字段,返回对象为Column类型。

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val ageColumn: Column = df.col("age")
    val nameColumn: Column = df.col("name")

    println(ageColumn)
    println(nameColumn)
代码语言:javascript
复制
age
name

apply:获取指定字段   只能获取一个字段,返回对象为Column类型

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val ageColumn: Column = df.apply("age")
    val nameColumn: Column = df.apply("name")

    println(ageColumn)
    println(nameColumn)
代码语言:javascript
复制
age
name

drop:去除指定字段,保留其他字段   返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDf: DataFrame = df.drop("sex")
    newDf.show()
代码语言:javascript
复制
+---+------+---+
| id|  name|age|
+---+------+---+
|  1|  张三| 18|
|  2|  绣花| 16|
|  3|  李四| 18|
|  4|  王五| 18|
|  5|  翠花| 19|
|  7|  张鹏| 14|
|  8|  刘秀| 13|
|  9|王菲菲| 20|
| 10|  乐乐| 21|
| 11|  小惠| 23|
| 12|  梦雅| 25|
+---+------+---+

也可以去除多个字段

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDf: DataFrame = df.drop("sex","id")
    newDf.show()
代码语言:javascript
复制
+------+---+
|  name|age|
+------+---+
|  张三| 18|
|  绣花| 16|
|  李四| 18|
|  王五| 18|
|  翠花| 19|
|  张鹏| 14|
|  刘秀| 13|
|王菲菲| 20|
|  乐乐| 21|
|  小惠| 23|
|  梦雅| 25|
+------+---+

limit

limit 方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDf: DataFrame = df.limit(3)
    newDf.show()
代码语言:javascript
复制
+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1|张三| 18| 男|
|  2|绣花| 16| 女|
|  3|李四| 18| 男|
+---+----+---+---+

order by

orderBysort:按指定字段排序,默认为升序

orderBy按照年龄排序(asc)

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.orderBy("age").show()
代码语言:javascript
复制
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  8|  刘秀| 13| 男|
|  7|  张鹏| 14| 男|
|  2|  绣花| 16| 女|
|  4|  王五| 18| 男|
|  3|  李四| 18| 男|
|  1|  张三| 18| 男|
|  5|  翠花| 19| 女|
|  9|王菲菲| 20| 女|
| 10|  乐乐| 21| 男|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

orderBy按照年龄排序(desc)

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.orderBy(df("age").desc).show()
代码语言:javascript
复制
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
| 12|  梦雅| 25| 女|
| 11|  小惠| 23| 女|
| 10|  乐乐| 21| 男|
|  9|王菲菲| 20| 女|
|  5|  翠花| 19| 女|
|  4|  王五| 18| 男|
|  1|  张三| 18| 男|
|  3|  李四| 18| 男|
|  2|  绣花| 16| 女|
|  7|  张鹏| 14| 男|
|  8|  刘秀| 13| 男|
+---+------+---+---+

sort按照年龄排序(asc)

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.sort(df("age").asc).show()

sort按照年龄排序(desc)

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.sort(df("age").desc).show()

sortWithinPartitions   和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。

sortWithinPartitions按照年龄排序(asc)

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    df.sortWithinPartitions(df("age").asc).show()
代码语言:javascript
复制
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  2|  绣花| 16| 女|
|  1|  张三| 18| 男|
|  3|  李四| 18| 男|
|  4|  王五| 18| 男|
|  5|  翠花| 19| 女|
------------------------------ 分为两个区,
|  8|  刘秀| 13| 男|
|  7|  张鹏| 14| 男|
|  9|王菲菲| 20| 女|
| 10|  乐乐| 21| 男|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
+---+------+---+---+

group by

groupBy:根据字段进行group by操作 groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。

案例:按照性别分组,统计各个性别的总人数

cuberollupgroup by的扩展   功能类似于SQL中的group by cube/rollup

GroupedData对象   该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,

  • max(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").max("age")
    newDF.show()
代码语言:javascript
复制
+---+--------+
|sex|max(age)|
+---+--------+
| 男|      21|
| 女|      25|
+---+--------+
  • min(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").min("age")
    newDF.show()
代码语言:javascript
复制
+---+--------+
|sex|min(age)|
+---+--------+
| 男|      13|
| 女|      16|
+---+--------+
  • mean(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").mean("age")
    newDF.show()
代码语言:javascript
复制
+---+--------+
|sex|avg(age)|
+---+--------+
| 男|    17.0|
| 女|    20.6|
+---+--------+
  • sum(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").sum("age")
    newDF.show()
代码语言:javascript
复制
+---+--------+
|sex|sum(age)|
+---+--------+
| 男|     102|
| 女|     103|
+---+--------+
  • count()方法,获取分组中的元素个数
代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.groupBy("sex").count()
    newDF.show()
代码语言:javascript
复制
+---+-----+
|sex|count|
+---+-----+
| 男|    6|
| 女|    5|
+---+-----+

distinct

distinct:返回一个不包含重复记录的DataFrame   返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.distinct()
    newDF.show()

因为没有列是重复的数据所以就不展示了

dropDuplicates:根据指定字段去重   根据指定字段去重。类似于select distinct a, b操作

按照年龄剔重

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.dropDuplicates("age")
    newDF.show()
代码语言:javascript
复制
+---+------+---+---+
| id|  name|age|sex|
+---+------+---+---+
|  8|  刘秀| 13| 男|
|  2|  绣花| 16| 女|
|  9|王菲菲| 20| 女|
|  5|  翠花| 19| 女|
| 11|  小惠| 23| 女|
| 12|  梦雅| 25| 女|
| 10|  乐乐| 21| 男|
|  7|  张鹏| 14| 男|
|  1|  张三| 18| 男|
+---+------+---+---+

聚合

聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。

代码语言:javascript
复制
    val df: DataFrame = list.toDF()
    val newDF: DataFrame = df.agg(
      "age" -> "max",
      "age" -> "avg",
      "age" -> "min",
      "age" -> "sum",
      "age" -> "count"
    )
   newDF.show()
代码语言:javascript
复制
+--------+------------------+--------+--------+----------+
|max(age)|          avg(age)|min(age)|sum(age)|count(age)|
+--------+------------------+--------+--------+----------+
|      25|18.636363636363637|      13|     205|        11|
+--------+------------------+--------+--------+----------+

union

重新整理一下数据

代码语言:javascript
复制
    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )

新增一个classid

代码语言:javascript
复制
case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)

示例

代码语言:javascript
复制
    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.union(boysDF)

    value.foreach(println(_))
代码语言:javascript
复制
[2,绣花,16,女,1]
[5,翠花,19,女,2]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[8,刘秀,13,男,2]
[7,张鹏,14,男,1]
[10,乐乐,21,男,1]

unionAll方法:对两个DataFrame进行组合   类似于SQL中的UNION ALL操作。

代码语言:javascript
复制
    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.unionAll(boysDF)

    value.foreach(println(_))
代码语言:javascript
复制
[5,翠花,19,女,2]
[2,绣花,16,女,1]
[11,小惠,23,女,1]
[9,王菲菲,20,女,1]
[12,梦雅,25,女,3]
[1,张三,18,男,3]
[3,李四,18,男,2]
[4,王五,18,男,2]
[7,张鹏,14,男,1]
[8,刘秀,13,男,2]
[10,乐乐,21,男,1]

join

重点来了。在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。   接下来隆重介绍join方法。在DataFrame中提供了六个重载的join方法。 笛卡尔积

代码语言:javascript
复制
    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF)

    value.foreach(println(_))
代码语言:javascript
复制
[2,绣花,16,女,1,3,李四,18,男,2]
[2,绣花,16,女,1,7,张鹏,14,男,1]
[2,绣花,16,女,1,8,刘秀,13,男,2]
[2,绣花,16,女,1,1,张三,18,男,3]
[5,翠花,19,女,2,3,李四,18,男,2]
[5,翠花,19,女,2,7,张鹏,14,男,1]
[5,翠花,19,女,2,8,刘秀,13,男,2]
[5,翠花,19,女,2,1,张三,18,男,3]
[9,王菲菲,20,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,7,张鹏,14,男,1]
[9,王菲菲,20,女,1,8,刘秀,13,男,2]
[11,小惠,23,女,1,3,李四,18,男,2]
[9,王菲菲,20,女,1,1,张三,18,男,3]
[11,小惠,23,女,1,7,张鹏,14,男,1]
[11,小惠,23,女,1,8,刘秀,13,男,2]
[12,梦雅,25,女,3,3,李四,18,男,2]
[12,梦雅,25,女,3,7,张鹏,14,男,1]
[11,小惠,23,女,1,1,张三,18,男,3]
[12,梦雅,25,女,3,8,刘秀,13,男,2]
[2,绣花,16,女,1,4,王五,18,男,2]
[12,梦雅,25,女,3,1,张三,18,男,3]
[2,绣花,16,女,1,10,乐乐,21,男,1]
[5,翠花,19,女,2,4,王五,18,男,2]
[5,翠花,19,女,2,10,乐乐,21,男,1]
[9,王菲菲,20,女,1,4,王五,18,男,2]
[9,王菲菲,20,女,1,10,乐乐,21,男,1]
[11,小惠,23,女,1,4,王五,18,男,2]
[11,小惠,23,女,1,10,乐乐,21,男,1]
[12,梦雅,25,女,3,4,王五,18,男,2]
[12,梦雅,25,女,3,10,乐乐,21,男,1]

using一个字段形式   下面这种join类似于a join b using column1的形式,需要两个DataFrame中有相同的一个列名,

代码语言:javascript
复制
    import sparkSession.implicits._

    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,"classId")

    value.foreach(println(_))
代码语言:javascript
复制
[2,5,翠花,19,女,8,刘秀,13,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]

using多个字段形式   除了上面这种using一个字段的情况外,还可以using多个字段,如下

代码语言:javascript
复制
    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId","id"))

    value.foreach(println(_))

指定join类型   两个DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型。在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型,如下所示 left_outer

代码语言:javascript
复制
    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"left_outer")

    value.foreach(println(_))
代码语言:javascript
复制
[1,11,小惠,23,女,10,乐乐,21,男]
[2,5,翠花,19,女,8,刘秀,13,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,7,张鹏,14,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[2,5,翠花,19,女,3,李四,18,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[3,12,梦雅,25,女,1,张三,18,男]

right_outer

代码语言:javascript
复制
    val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()
    val value: Dataset[Row] = femaleDF.join(boysDF,Seq("classId"),"right_outer")

    value.foreach(println(_))
代码语言:javascript
复制
[2,5,翠花,19,女,8,刘秀,13,男]
[2,5,翠花,19,女,3,李四,18,男]
[3,12,梦雅,25,女,1,张三,18,男]
[1,11,小惠,23,女,7,张鹏,14,男]
[1,11,小惠,23,女,10,乐乐,21,男]
[1,9,王菲菲,20,女,7,张鹏,14,男]
[1,9,王菲菲,20,女,10,乐乐,21,男]
[2,5,翠花,19,女,4,王五,18,男]
[1,2,绣花,16,女,10,乐乐,21,男]
[1,2,绣花,16,女,7,张鹏,14,男]

其他的就演示了 以上案例整理参考:https://blog.csdn.net/dabokele/article/details/52802150 更多API请参考Spark官网

上面使用的是样例类,会自动将字段名称字段类型与表中的字段进行对应

代码语言:javascript
复制
case class Student(id:Int,name:String,age:Int,sex:String,classId:Int)
代码语言:javascript
复制
@Test
  def demo02: Unit ={

    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )


    import sparkSession.implicits._

    //val femaleDF: DataFrame = female.toDF()
    val boysDF: DataFrame = boys.toDF()

    boysDF.show()
  }
代码语言:javascript
复制
+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
|  1|张三| 18| 男|      3|
|  3|李四| 18| 男|      2|
|  4|王五| 18| 男|      2|
|  7|张鹏| 14| 男|      1|
|  8|刘秀| 13| 男|      2|
| 10|乐乐| 21| 男|      1|
+---+----+---+---+-------+

使用printSchema 查看字段类型

代码语言:javascript
复制
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- sex: string (nullable = true)
 |-- classId: integer (nullable = false)

处理使用样例类,也可以使用元组的形式

代码语言:javascript
复制
  @Test
  def demo03(): Unit ={
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )

    import sparkSession.implicits._
    val boysDF: DataFrame = list.toDF()

    boysDF.show()
  }

表字段将使用元组索引命名

代码语言:javascript
复制
+---+----+---+---+---+
| _1|  _2| _3| _4| _5|
+---+----+---+---+---+
|  1|张三| 18| 男|  3|
|  3|李四| 18| 男|  2|
|  4|王五| 18| 男|  2|
|  7|张鹏| 14| 男|  1|
|  8|刘秀| 13| 男|  2|
| 10|乐乐| 21| 男|  1|
+---+----+---+---+---+

使用printSchema 查看字段类型

代码语言:javascript
复制
root
 |-- _1: integer (nullable = false)
 |-- _2: string (nullable = true)
 |-- _3: integer (nullable = false)
 |-- _4: string (nullable = true)
 |-- _5: integer (nullable = false)

toDF(colNames: String*) 重新队列进行命名 字段名为_N的形式,不是很友好,可以自行指定

代码语言:javascript
复制
  @Test
  def demo03(): Unit ={
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )

    import sparkSession.implicits._
    val boysDF: DataFrame = list.toDF("id","name","age","sex","classId")

    boysDF.show()
  }
代码语言:javascript
复制
+---+----+---+---+-------+
| id|name|age|sex|classId|
+---+----+---+---+-------+
|  1|张三| 18| 男|      3|
|  3|李四| 18| 男|      2|
|  4|王五| 18| 男|      2|
|  7|张鹏| 14| 男|      1|
|  8|刘秀| 13| 男|      2|
| 10|乐乐| 21| 男|      1|
+---+----+---+---+-------+

总结一下:

  1. 若数据为元组时,字段名为元组的索引名
  2. 列表中的字段类型必须一致。 如:第一列为id列,第二行的类型却为字符类型
代码语言:javascript
复制
      (1,"张三",18,"男",3),
      ("3","李四",18,"男",2),
  1. 列表中的参数个数必须一致。 如: 第一列5个参数,第二行3个参数,这样是不行的。
代码语言:javascript
复制
      (1,"张三",18,"男",3),
      (3,"李四",18),
  1. 可以使用 toDF(colNames: String*)重载方法,设置命名,必须元参数个数保持一致。

RDD 转 DataFrame

除了使用集合.toDF,也可以使用rdd.toDF 将 RDD转为DataFrame

代码语言:javascript
复制
  @Test
  def demo04(): Unit ={
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )

    // 获取 SparkContext
    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)

    // 使用 toDF 必须定义隐式转换
    import sparkSession.implicits._
   
    // RDD 转换成 DataFrame 
    val df: DataFrame = rdd.toDF
    df.show()

  }

使用toDF必须定义隐式转换

DataFrame的创建方式[了解]

上面的所有案例都是采用 toDF 的方式创建,关于DataFrame的创建方式一共有四种创建方式。

  1. 可以通过toDF方法创建 使用toDF必须进行隐式转换
代码语言:javascript
复制
import sparkSession.implicits._
  1. 通过createDataFrame创建

createDataFrame[A <: Product : TypeTag](rdd: RDD[A])

代码语言:javascript
复制
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )
    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
    val df: DataFrame = sparkSession.createDataFrame(rdd)
    df.show()
代码语言:javascript
复制
+---+----+---+---+---+
| _1|  _2| _3| _4| _5|
+---+----+---+---+---+
|  1|张三| 18| 男|  3|
|  3|李四| 18| 男|  2|
|  4|王五| 18| 男|  2|
|  7|张鹏| 14| 男|  1|
|  8|刘秀| 13| 男|  2|
| 10|乐乐| 21| 男|  1|
+---+----+---+---+---+

createDataFrame(rowRDD: RDD[Row], schema: StructType)

代码语言:javascript
复制
@Test
  def demo06(): Unit ={
    val list=List(
      Row(1,"张三",18,"男",3),
      Row(3,"李四",18,"男",2),
      Row(4,"王五",18,"男",2),
      Row(7,"张鹏",14,"男",1),
      Row(8,"刘秀",13,"男",2),
      Row(10,"乐乐",21,"男",1)
    )


    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[Row] = sc.parallelize(list)
    // 指定StructType

    val fields=Array(
      StructField("id",IntegerType),
      StructField("name",StringType),
      StructField("age",IntegerType),
      StructField("sex",StringType),
      StructField("classId",IntegerType)
    )

    val schema =StructType(fields)

    val df = sparkSession.createDataFrame(rdd, schema)
    df.show()

  }

相关依赖

代码语言:javascript
复制
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row}

剩下的暂时用不到... createDataFrame[A <: Product : TypeTag](data: Seq[A]) createDataFrame(rowRDD: JavaRDD[Row], schema: StructType) createDataFrame(rows: java.util.List[Row], schema: StructType) createDataFrame(rdd: RDD[_], beanClass: Class[_]) createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]) createDataFrame(data: java.util.List[_], beanClass: Class[_])

示例一:

代码语言:javascript
复制
    val list=List(
      (1,"张三",18,"男",3),
      (3,"李四",18,"男",2),
      (4,"王五",18,"男",2),
      (7,"张鹏",14,"男",1),
      (8,"刘秀",13,"男",2),
      (10,"乐乐",21,"男",1)
    )
    val sc: SparkContext = sparkSession.sparkContext
    val rdd: RDD[(Int, String, Int, String, Int)] = sc.parallelize(list, 2)
    val df: DataFrame = sparkSession.createDataFrame(rdd)
    df.show()
  1. 通过读取文件创建
  2. 通过其他的dataFrame衍生 上面的很多案例也有演示,就是通过上次结果的DataFrame返回一个新的DataFrame
代码语言:javascript
复制
@Test
  def demo08(): Unit ={

    val female=List(
      Student(2,"绣花",16,"女",1),
      Student(5,"翠花",19,"女",2),
      Student(9,"王菲菲",20,"女",1),
      Student(11,"小惠",23,"女",1),
      Student(12,"梦雅",25,"女",3)
    )

    val boys=List(
      Student(1,"张三",18,"男",3),
      Student(3,"李四",18,"男",2),
      Student(4,"王五",18,"男",2),
      Student(7,"张鹏",14,"男",1),
      Student(8,"刘秀",13,"男",2),
      Student(10,"乐乐",21,"男",1)
    )


    // 导入隐式转换
    import sparkSession.implicits._

    val femaleDf: DataFrame = female.toDF()
    val boysDf: DataFrame = boys.toDF()

    val unionAllDf: DataFrame = femaleDf.unionAll(boysDf)

    val group: RelationalGroupedDataset = unionAllDf.groupBy("sex")

    val resultDf: DataFrame = group.max("age")

    resultDf.show()

  }
代码语言:javascript
复制
+---+--------+
|sex|max(age)|
+---+--------+
| 男|      21|
| 女|      25|
+---+--------+

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DataFrame
  • DataFrame对象上Action操作
  • DataFrame对象上的条件查询和join等操作
    • where条件相关
      • 查询指定字段
        • limit
          • order by
            • group by
              • distinct
              • 聚合
              • union
              • join
              • RDD 转 DataFrame
              • DataFrame的创建方式[了解]
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档