首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >星星之火(Scala) -在DataFrame中恢复爆炸

星星之火(Scala) -在DataFrame中恢复爆炸
EN

Stack Overflow用户
提问于 2018-04-02 13:54:43
回答 3查看 5.6K关注 0票数 2

我最初有一个DataFrame,如下所示:

代码语言:javascript
复制
Key     Emails                      PassportNum     Age
0001    [Alan@gmail,Alan@hotmail]   passport1       23
0002    [Ben@gmail,Ben@hotmail]     passport2       28

我需要在每封电子邮件上应用一个函数,比如在末尾添加"_2“之类的虚拟功能--例如,操作是不相关的。所以我会把这个专栏炸成这样:

代码语言:javascript
复制
val dfExplode = df.withColumn("Email",explode($"Emails")).drop("Emails")

现在我将有一个这样的数据文件:

代码语言:javascript
复制
Key     Email           PassportNum     Age
0001    Alan@gmail      passport1       23
0001    Alan@hotmail    passport1       23
0002    Ben@gmail       passport2       28
0002    Ben@hotmail     passport2       28

我申请护照上的任何更改,然后我想要的是:

代码语言:javascript
复制
Key     Emails                          PassportNum     Age
0001    [Alan_2@gmail,Alan_2@hotmail]   passport1       23
0002    [Ben_2@gmail,Ben_2@hotmail]     passport2       28

我正在考虑的选择是:

代码语言:javascript
复制
dfOriginal = dfExploded.groupBy("Key","PassportNum","Age").agg(collect_set("Email").alias("Emails"))

在这种情况下,这可能不是一种糟糕的方法。但在我的实际情况下,我执行爆炸在一个单一的列,我有另外20列,如PassportNum,年龄.它们将被复制。

这意味着我需要在groupBy中添加大约20列,当我真的可以通过一个单独的列执行组时,例如键是唯一的。

我正在考虑在agg中添加以下列:

代码语言:javascript
复制
dfOriginal = dfExploded.groupBy("Key").agg(collect_set("Email").alias("Emails"),collect_set("PassportNum"),collect_set("Age"))

但我不希望它们在一个元素数组中。

在没有任何collect_*的情况下,有任何方法可以生成聚合吗?是否有更简单的方法来撤销explode

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-04-02 14:03:54

假设您想留在DataFrame世界中,那么定义一个操纵输入数组的UDF可能是值得的。以Seq作为输入并返回修改后的输入的内容。例如:

代码语言:javascript
复制
def myUdf = udf[Seq[String], Seq[String]] { 
    inputSeq => inputSeq.map(elem => elem + "_2")
}

df.withColumn("Emails", myUdf($"Emails"))

更好的是,您可以将确切的逻辑作为参数传递:

代码语言:javascript
复制
def myUdf(myFunc: String => String) = udf[Seq[String], Seq[String]] {
    inputSeq => inputSeq.map(myFunc)
}

df.withColumn("Emails", myUdf((email: String) => email + "_XYZ")($"Emails"))
票数 4
EN

Stack Overflow用户

发布于 2018-04-02 14:07:01

除了所有常见字段的groupby之外,另一个选项是在单独的临时数据re上执行爆炸操作,然后从原始字段中删除已爆炸的列,然后加入由

然而,编写一个可以直接操作数组的UDF可能更简单,而不会进入爆炸和聚集。

代码语言:javascript
复制
def handleEmail(emails: mutable.WrappedArray[String]) = {
     emails.map(dosomething)
  }

context.udf.register("handleEmailsm"m (em:mutabe.WrappedArray[String]) => handleEmail(em))
票数 1
EN

Stack Overflow用户

发布于 2018-04-02 14:13:18

这意味着我需要在groupBy中添加大约20列,当我真的可以通过一个单独的列执行组时,例如键是唯一的。

您可以跳过编写每个列名,只需执行一个简单的技巧,如下所示,您可以使用所有列名(或选定的列名),但爆炸的列名除外。

代码语言:javascript
复制
import org.apache.spark.sql.functions._
val dfExploded = df.withColumn("Emails", explode($"Emails"))

val groupColumns = dfExploded.columns.filterNot(_.equalsIgnoreCase("Emails"))

val dfOriginal = dfExploded.groupBy(groupColumns.map(col): _*).agg(collect_set("Emails").alias("Emails"))

创建一个结构列

您可以使用struct内置函数创建单个列,并在groupBy中将该单列用作

代码语言:javascript
复制
val groupColumns = df.columns.filterNot(_.equalsIgnoreCase("Emails"))

import org.apache.spark.sql.functions._
val dfExploded = df.select(struct(groupColumns.map(col): _*).as("groupedKey"), col("Emails"))
  .withColumn("Emails", explode($"Emails"))

这会让你

代码语言:javascript
复制
+-------------------+------------+
|groupedKey         |Emails      |
+-------------------+------------+
|[0001,passport1,23]|Alan@gmail  |
|[0001,passport1,23]|Alan@hotmail|
|[0002,passport2,28]|Ben@gmail   |
|[0002,passport2,28]|Ben@hotmail |
+-------------------+------------+

然后在groupedKey中使用groupBy,然后在select中再次将它们分开

代码语言:javascript
复制
val dfOriginal = dfExploded.groupBy("groupedKey").agg(collect_set("Emails").alias("Emails"))
  .select($"groupedKey.*", $"Emails")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49612641

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档