首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Apache Spark SQL:如何使用GroupBy和Max过滤数据

Apache Spark SQL:如何使用GroupBy和Max过滤数据
EN

Stack Overflow用户
提问于 2019-06-07 09:53:58
回答 2查看 156关注 0票数 0

我有一个给定的数据集,结构如下:

https://i.imgur.com/Kk7I1S1.png

我需要使用SparkSQL解决以下问题:

对于每个邮政编码,找出以前发生事故次数最多的客户。在平局的情况下,意味着多个客户具有相同的最高事故数量,只需返回其中任何一个即可。对于这些选定的客户中的每一个,输出以下列:邮政编码、客户id、以前的事故数量。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-06-07 14:44:33

我认为你错过了提供你在图像链接中提到的数据。我已经创建了自己的数据集,将您的问题作为参考。您可以使用下面的代码片段作为参考,也可以用您的数据集替换df数据框来添加所需的列,如id等。

代码语言:javascript
运行
复制
      scala> val df  = spark.read.format("csv").option("header","true").load("/user/nikhil/acc.csv")
        df: org.apache.spark.sql.DataFrame = [postcode: string, customer: string ... 1 more field]

        scala> df.show()
        +--------+--------+---------+
        |postcode|customer|accidents|
        +--------+--------+---------+
        |       1|  Nikhil|        5|
        |       2|     Ram|        4|
        |       1|   Shyam|        3|
        |       3|  pranav|        1|
        |       1|   Suman|        2|
        |       3|    alex|        2|
        |       2|     Raj|        5|
        |       4|   arpit|        3|
        |       1|   darsh|        2|
        |       1|   rahul|        3|
        |       2|   kiran|        4|
        |       3|    baba|        4|
        |       4|    alok|        3|
        |       1|   Nakul|        5|
        +--------+--------+---------+


        scala> df.createOrReplaceTempView("tmptable")

   scala> spark.sql(s"""SELECT postcode,customer, accidents FROM (SELECT postcode,customer, accidents, row_number() over (PARTITION BY postcode ORDER BY accidents desc) as rn  from tmptable) WHERE rn = 1""").show(false)
+--------+--------+---------+                                                   
|postcode|customer|accidents|
+--------+--------+---------+
|3       |baba    |4        |
|1       |Nikhil  |5        |
|4       |arpit   |3        |
|2       |Raj     |5        |
+--------+--------+---------+
票数 0
EN

Stack Overflow用户

发布于 2019-06-07 15:00:06

您可以在python中使用以下代码得到结果:

代码语言:javascript
运行
复制
from pyspark.sql import Row, Window
import pyspark.sql.functions as F
from pyspark.sql.window import *

l = [(1, '682308', 25), (1, '682308', 23), (2, '682309', 23), (1, '682309', 27), (2, '682309', 22)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(c_id=int(x[0]), postcode=x[1], accident=int(x[2])))
schemaPeople = sqlContext.createDataFrame(people)
result = schemaPeople.groupby("postcode", "c_id").agg(F.max("accident").alias("accident"))
new_result = result.withColumn("row_num", F.row_number().over(Window.partitionBy("postcode").orderBy(F.desc("accident")))).filter("row_num==1")
new_result.show()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56487053

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档