首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构
清单首页spark文章详情

2021年大数据Spark(二十九):SparkSQL案例四开窗函数


​​​​​​​案例四:开窗函数

概述

https://www.cnblogs.com/qiuting/p/7880500.html

介绍

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

聚合函数和开窗函数

聚合函数是将多行变成一行,count,avg....

开窗函数是将一行变成多行;

聚合函数如果要显示其他的列必须将列加入到group by中

开窗函数可以不使用group by,直接将所有信息显示出来

开窗函数分类

1.聚合开窗函数

聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。

2.排序开窗函数

排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

​​​​​​​聚合开窗函数

示例1

OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。

SQL标准允许将所有聚合函数用做聚合开窗函数。

spark.sql("select  count(name)  from scores").show

spark.sql("select name, class, score, count(name) over() name_count from scores").show

查询结果如下所示:

+----+-----+-----+----------+                                                   

|name|class|score|name_count|

+----+-----+-----+----------+

|  a1|    1|   80|        11|

|  a2|    1|   78|        11|

|  a3|    1|   95|        11|

|  a4|    2|   74|        11|

|  a5|    2|   92|        11|

|  a6|    3|   99|        11|

|  a7|    3|   99|        11|

|  a8|    3|   45|        11|

|  a9|    3|   55|        11|

| a10|    3|   78|        11|

| a11|    3|  100|        11|

+----+-----+-----+----------+

 示例2

OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。

如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。

开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。

与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。

下面的 SQL 语句用于显示按照班级分组后每组的人数:

OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

查询结果如下所示:

+----+-----+-----+----------+                                                   

|name|class|score|name_count|

+----+-----+-----+----------+

|  a1|    1|   80|         3|

|  a2|    1|   78|         3|

|  a3|    1|   95|         3|

|  a6|    3|   99|         6|

|  a7|    3|   99|         6|

|  a8|    3|   45|         6|

|  a9|    3|   55|         6|

| a10|    3|   78|         6|

| a11|    3|  100|         6|

|  a4|    2|   74|         2|

|  a5|    2|   92|         2|

+----+-----+-----+----------+

排序开窗函数

ROW_NUMBER顺序排序

row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

注意:

在排序开窗函数中使用 PARTITION  BY 子句需要放置在ORDER  BY 子句之前。

 ●示例1

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

|  a2|    1|   78|   4|

| a10|    3|   78|   5|

|  a1|    1|   80|   6|

|  a5|    2|   92|   7|

|  a3|    1|   95|   8|

|  a6|    3|   99|   9|

|  a7|    3|   99|  10|

| a11|    3|  100|  11|

+----+-----+-----+----+

spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。

这个函数求出来的排名结果可以并列,并列排名之后的排名将是并列的排名加上并列数

简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名

●示例2

spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()                                                     

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

| a10|    3|   78|   4|

|  a2|    1|   78|   4|

|  a1|    1|   80|   6|

|  a5|    2|   92|   7|

|  a3|    1|   95|   8|

|  a6|    3|   99|   9|

|  a7|    3|   99|   9|

| a11|    3|  100|  11|

+----+-----+-----+----+

spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   4|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​ DENSE_RANK连续排序

dense_rank() over(order by  score) as  dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。

这个函数并列排名之后的排名只是并列排名加1

简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名

●示例3

spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

|  a2|    1|   78|   4|

| a10|    3|   78|   4|

|  a1|    1|   80|   5|

|  a5|    2|   92|   6|

|  a3|    1|   95|   7|

|  a6|    3|   99|   8|

|  a7|    3|   99|   8|

| a11|    3|  100|   9|

+----+-----+-----+----+

spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   4|

| a11|    3|  100|   5|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​NTILE分组排名[了解]

ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

 示例4

spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()

+----+-----+-----+----+

|name|class|score|rank|

+----+-----+-----+----+

|  a8|    3|   45|   1|

|  a9|    3|   55|   1|

|  a4|    2|   74|   2|

|  a2|    1|   78|   2|

| a10|    3|   78|   3|

|  a1|    1|   80|   3|

|  a5|    2|   92|   4|

|  a3|    1|   95|   4|

|  a6|    3|   99|   5|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

+----+-----+-----+----+

spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()

+----+-----+-----+----+                                                         

|name|class|score|rank|

+----+-----+-----+----+

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

+----+-----+-----+----+

​​​​​​​代码演示

代码语言:javascript
复制
package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * Author itcast
 * Date 2020/9/21 9:33
 * Desc 使用SparkSQL支持的开窗函数/窗口函数完成对各个班级的学生成绩的排名
 */
object RowNumberDemo {
  case class Score(name: String, clazz: Int, score: Int)
  def main(args: Array[String]): Unit = {
    //1.准备环境
    val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //2.加载数据
    val scoreDF: DataFrame = sc.makeRDD(Array(
      Score("a1", 1, 80),
      Score("a2", 1, 78),
      Score("a3", 1, 95),
      Score("a4", 2, 74),
      Score("a5", 2, 92),
      Score("a6", 3, 99),
      Score("a7", 3, 99),
      Score("a8", 3, 45),
      Score("a9", 3, 55),
      Score("a10", 3, 78),
      Score("a11", 3, 100))
    ).toDF("name", "class", "score")
    scoreDF.createOrReplaceTempView("t_scores")
    scoreDF.show()
    /*
    +----+-----+-----+
    |name|class|score|num
    +----+-----+-----+
    |  a1|    1|   80|
    |  a2|    1|   78|
    |  a3|    1|   95|
    |  a4|    2|   74|
    |  a5|    2|   92|
    |  a6|    3|   99|
    |  a7|    3|   99|
    |  a8|    3|   45|
    |  a9|    3|   55|
    | a10|    3|   78|
    | a11|    3|  100|
    +----+-----+-----+
     */


    //使用ROW_NUMBER顺序排序
    spark.sql("select name, class, score, row_number() over(partition by class order by score) num from t_scores").show()
    //使用RANK跳跃排序
    spark.sql("select name, class, score, rank() over(partition by class order by score) num from t_scores").show()
    //使用DENSE_RANK连续排序
    spark.sql("select name, class, score, dense_rank() over(partition by class order by score) num from t_scores").show()

    /*
ROW_NUMBER顺序排序--1234
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
|  a2|    1|   78|  1|
|  a1|    1|   80|  2|
|  a3|    1|   95|  3|
|  a8|    3|   45|  1|
|  a9|    3|   55|  2|

| a10|    3|   78|  3|
|  a6|    3|   99|  4|
|  a7|    3|   99|  5|
| a11|    3|  100|  6|

|  a4|    2|   74|  1|
|  a5|    2|   92|  2|
+----+-----+-----+---+

使用RANK跳跃排序--1224
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
|  a2|    1|   78|  1|
|  a1|    1|   80|  2|
|  a3|    1|   95|  3|
|  a8|    3|   45|  1|
|  a9|    3|   55|  2|

| a10|    3|   78|  3|
|  a6|    3|   99|  4|
|  a7|    3|   99|  4|
| a11|    3|  100|  6|

|  a4|    2|   74|  1|
|  a5|    2|   92|  2|
+----+-----+-----+---+

DENSE_RANK连续排序--1223
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
|  a2|    1|   78|  1|
|  a1|    1|   80|  2|
|  a3|    1|   95|  3|
|  a8|    3|   45|  1|
|  a9|    3|   55|  2|

| a10|    3|   78|  3|
|  a6|    3|   99|  4|
|  a7|    3|   99|  4|
| a11|    3|  100|  5|

|  a4|    2|   74|  1|
|  a5|    2|   92|  2|
+----+-----+-----+---+
     */

    /*
    
    val sql =
      """
        |select 字段1,字段2,字段n,
        |row_number() over(partition by 字段1 order by 字段2 desc) num
        |from 表名
        |having num <= 3
        |""".stripMargin

    import org.apache.spark.sql.functions._
    df.withColumn(
      "num",
      row_number().over(Window.partitionBy('字段1).orderBy('字段2.desc))
    ).filter('num <= 3).show(false)
    
     */
  }
}
下一篇
举报
领券