首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >从Dataset同时聚合多个字段

从Dataset同时聚合多个字段
EN

Stack Overflow用户
提问于 2018-07-10 06:55:11
回答 4查看 116关注 0票数 0

我有以下计划的数据:

代码语言:javascript
运行
复制
sourceip
destinationip
packets sent

我想从这些数据中计算出几个聚合字段,并有以下模式:

代码语言:javascript
运行
复制
ip 
packets sent as sourceip
packets sent as destination

在RDDs的快乐日子里,我可以使用aggregate,定义一个{ip -> []}的映射,并在相应的数组位置中计数外观。

在Dataset/Dataframe聚合不再可用,相反,不幸的是,根据我在数据集中的经验,它们是不可变的,这意味着它们不能被使用(必须在每次地图更新中创建一个新实例) example + explanation here

一方面,从技术上讲,我可以将数据集转换为RDD、聚合等,然后返回dataset。我预计这会导致性能下降,因为数据集更加优化。由于复制,UDAF是不可能的。

还有其他方法来执行聚合吗?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2018-07-10 10:11:17

听起来您需要一个标准的melt (How to melt Spark DataFrame?)和pivot组合:

代码语言:javascript
运行
复制
val df = Seq(
  ("192.168.1.102", "192.168.1.122", 10),
  ("192.168.1.122", "192.168.1.65", 10),
  ("192.168.1.102", "192.168.1.97", 10)
).toDF("sourceip", "destinationip", "packets sent")


df.melt(Seq("packets sent"), Seq("sourceip", "destinationip"), "type", "ip")
  .groupBy("ip")
  .pivot("type", Seq("sourceip", "destinationip"))
  .sum("packets sent").na.fill(0).show

// +-------------+--------+-------------+             
// |           ip|sourceip|destinationip|
// +-------------+--------+-------------+
// | 192.168.1.65|       0|           10|
// |192.168.1.102|      20|            0|
// |192.168.1.122|      10|           10|
// | 192.168.1.97|       0|           10|
// +-------------+--------+-------------+
票数 3
EN

Stack Overflow用户

发布于 2018-07-10 10:09:41

不需要任何自定义聚合的方法之一是使用flatMap (或explode )进行数据处理,如下所示:

代码语言:javascript
运行
复制
case class Info(ip : String, sent : Int, received : Int)
case class Message(from : String, to : String, p : Int)
val ds = Seq(Message("ip1", "ip2", 5), 
             Message("ip2", "ip3", 7), 
             Message("ip2", "ip1", 1), 
             Message("ip3", "ip2", 3)).toDS()

ds
    .flatMap(x => Seq(Info(x.from, x.p, 0), Info(x.to, 0, x.p)))
    .groupBy("ip")
    .agg(sum('sent) as "sent", sum('received) as "received")
    .show


// +---+----+--------+
// | ip|sent|received|
// +---+----+--------+
// |ip2|   8|       8|
// |ip3|   3|       7|
// |ip1|   5|       1|
// +---+----+--------+

就性能而言,我不确定flatMap与自定义聚合相比是一种改进。

票数 2
EN

Stack Overflow用户

发布于 2018-07-10 12:37:40

下面是一个使用explode的pyspark版本。它更冗长,但逻辑与flatMap版本完全相同,只使用纯数据格式代码。

代码语言:javascript
运行
复制
sc\
  .parallelize([("ip1", "ip2", 5), ("ip2", "ip3", 7), ("ip2", "ip1", 1), ("ip3", "ip2", 3)])\
  .toDF(("from", "to", "p"))\
  .select(F.explode(F.array(\
      F.struct(F.col("from").alias("ip"),\
               F.col("p").alias("received"),\
               F.lit(0).cast("long").alias("sent")),\
      F.struct(F.col("to").alias("ip"),\
               F.lit(0).cast("long").alias("received"),\
               F.col("p").alias("sent")))))\
  .groupBy("col.ip")\
  .agg(F.sum(F.col("col.received")).alias("received"), F.sum(F.col("col.sent")).alias("sent"))

// +---+----+--------+
// | ip|sent|received|
// +---+----+--------+
// |ip2|   8|       8|
// |ip3|   3|       7|
// |ip1|   5|       1|
// +---+----+--------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51258898

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档