前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何在spark里面使用窗口函数

如何在spark里面使用窗口函数

作者头像
我是攻城师
发布2019-06-25 11:38:35
4.1K0
发布2019-06-25 11:38:35
举报
文章被收录于专栏:我是攻城师我是攻城师

在大数据分析中,窗口函数最常见的应用场景就是对数据进行分组后,求组内数据topN的需求,如果没有窗口函数,实现这样一个需求还是比较复杂的,不过现在大多数标准SQL中都支持这样的功能,今天我们就来学习下如何在spark sql使用窗口函数来完成一个分组求TopN的需求。

思路分析:

在spark sql中有两种方式可以实现:

(1)使用纯spark sql的方式。

(2)spark的编程api来实现。

虽然有两种形式,但底层原理都一样,借助了spark里面的window算子,我们先来看下纯sql的实现方式,其代码如下:

代码语言:javascript
复制
   def main(args: Array[String]): Unit = {
    val spark = SparkSession      .builder().master("local[1]")      .appName("Spark SQL basic example")      .getOrCreate()
    import spark.implicits._    val df = spark.sparkContext.parallelize(Seq(      (1,"2019-03-01","河北","ios"),      (1,"2019-06-01","北京","Android"),      (1,"2019-07-01","北京","Android"),      (1,"2019-07-01","北京","Android"),      (2,"2019-04-01","河南","Android"),      (2,"2019-06-01","山西","ios"),      (2,"2019-08-01","湖南","ios")    )).toDF("id", "date", "address","device")//转化df的三列数据s
    df.createOrReplaceTempView("login")
    //先对组内数据,进行排序    val s2=spark.sql("select id, date,address,device, rank() over (partition by id order by date desc ) as rank from login")    s2.createOrReplaceTempView("login2")    //取top N    val s3=spark.sql("select * from login2 where rank=1")
    s3.show()
  }

我们来看下输出结果如下:

代码语言:javascript
复制
+---+----------+-------+-------+----+| id|      date|address| device|rank|+---+----------+-------+-------+----+|  1|2019-07-01|     北京|Android|   1||  1|2019-07-01|     北京|Android|   1||  2|2019-08-01|     湖南|    ios|   1|+---+----------+-------+-------+----+

注意这里,我为了保持整洁,没有使用嵌套的子查询,而是在s3处,又过滤了一下结果。 我们看到,在sql中我们借助使用了rank函数,因为id=1的,最新日期有两个一样的,所以rank相等, 故最终结果返回了三条数据,到这里有的朋友可能就有疑问了,我只想对每组数据取topN,比如每组只取一条应该怎么控制,现在某组可能会返回2条,虽然意义上没错,但总觉得不太好,那么能不能实现呢?

答案是可以的,这就涉及到关于排名函数的介绍,我们这里只介绍常用的三种,分别是:

(1)rank

(2)row_number

(3)dense_rank

这次,我们用代码实现上面的需求,并观察上面上个函数生成rank值的区别,代码如下:

代码语言:javascript
复制
  def main(args: Array[String]): Unit = {
    val spark = SparkSession      .builder().master("local[1]")      .appName("Spark SQL basic example")      .getOrCreate()
    import spark.implicits._    val df = spark.sparkContext.parallelize(Seq(      (1,"2019-03-01","河北","ios"),      (1,"2019-06-01","北京","Android"),      (1,"2019-07-01","北京","Android"),      (1,"2019-07-01","北京","Android"),      (2,"2019-04-01","河南","Android"),      (2,"2019-06-01","山西","ios"),      (2,"2019-08-01","湖南","ios")    )).toDF("id", "date", "address","device")//转化df的三列数据s
//    df.createOrReplaceTempView("login")
    val s2=Window.partitionBy("id").orderBy(col("date").desc)    df.withColumn("rank",rank().over(s2))//生成rank值可以重复但不一定连续      .withColumn("dense_rank",dense_rank().over(s2))//生成rank值可以重复但是连续      .withColumn("row_number",row_number().over(s2))//生成的rank值不重复但是连续      .show()

  }

ok,我们看下输出结果:

代码语言:javascript
复制
+---+----------+-------+-------+----+----------+----------+| id|      date|address| device|rank|dense_rank|row_number|+---+----------+-------+-------+----+----------+----------+|  1|2019-07-01|     北京|Android|   1|         1|         1||  1|2019-07-01|     北京|Android|   1|         1|         2||  1|2019-06-01|     北京|Android|   3|         2|         3||  1|2019-03-01|     河北|    ios|   4|         3|         4||  2|2019-08-01|     湖南|    ios|   1|         1|         1||  2|2019-06-01|     山西|    ios|   2|         2|         2||  2|2019-04-01|     河南|Android|   3|         3|         3|+---+----------+-------+-------+----+----------+----------+

注意看输出数据的前三行,观察后面的值,我们能够发现上面三个函数的区别是:

(1)rank (生成rank值可以重复但不一定连续)

(2)row_number (生成rank值可以重复但是连续)

(3)dense_rank (生成的rank值不重复但是连续)

了解上面的区别后,我们再回到刚才的那个问题,如何取Top1的时候,每组只返回一条数据?

答案就是使用row_number进行过滤,如下,对上面的代码稍加改造即可:

代码语言:javascript
复制
    val s2=Window.partitionBy("id").orderBy(col("date").desc)    df.withColumn("rank",rank().over(s2))//生成rank值可以重复但不一定连续      .withColumn("dense_rank",dense_rank().over(s2))//生成rank值可以重复但是连续      .withColumn("row_number",row_number().over(s2))//生成的rank值不重复但是连续      .where("row_number=1")//新增代码      .show()

结果如下:

代码语言:javascript
复制
+---+----------+-------+-------+----+----------+----------+| id|      date|address| device|rank|dense_rank|row_number|+---+----------+-------+-------+----+----------+----------+|  1|2019-07-01|     北京|Android|   1|         1|         1||  2|2019-08-01|     湖南|    ios|   1|         1|         1|+---+----------+-------+-------+----+----------+----------+

在sql里面也一样,只要把rank函数换成row_number函数即可,这里就不在演示了,感兴趣的同学可以自己尝试下。

在spark的窗口函数里面,上面的应用场景属于比较常见的case,当然spark窗口函数的功能要比上面介绍的要丰富的多,这里就不在介绍了,想学习的同学可以参考下面的这个链接:

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我是攻城师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档