我最初有一个DataFrame,如下所示:
Key Emails PassportNum Age
0001 [Alan@gmail,Alan@hotmail] passport1 23
0002 [Ben@gmail,Ben@hotmail] passport2 28我需要在每封电子邮件上应用一个函数,比如在末尾添加"_2“之类的虚拟功能--例如,操作是不相关的。所以我会把这个专栏炸成这样:
val dfExplode = df.withColumn("Email",explode($"Emails")).drop("Emails")现在我将有一个这样的数据文件:
Key Email PassportNum Age
0001 Alan@gmail passport1 23
0001 Alan@hotmail passport1 23
0002 Ben@gmail passport2 28
0002 Ben@hotmail passport2 28我申请护照上的任何更改,然后我想要的是:
Key Emails PassportNum Age
0001 [Alan_2@gmail,Alan_2@hotmail] passport1 23
0002 [Ben_2@gmail,Ben_2@hotmail] passport2 28我正在考虑的选择是:
dfOriginal = dfExploded.groupBy("Key","PassportNum","Age").agg(collect_set("Email").alias("Emails"))在这种情况下,这可能不是一种糟糕的方法。但在我的实际情况下,我执行爆炸在一个单一的列,我有另外20列,如PassportNum,年龄.它们将被复制。
这意味着我需要在groupBy中添加大约20列,当我真的可以通过一个单独的列执行组时,例如键是唯一的。
我正在考虑在agg中添加以下列:
dfOriginal = dfExploded.groupBy("Key").agg(collect_set("Email").alias("Emails"),collect_set("PassportNum"),collect_set("Age"))但我不希望它们在一个元素数组中。
在没有任何collect_*的情况下,有任何方法可以生成聚合吗?是否有更简单的方法来撤销explode?
发布于 2018-04-02 14:03:54
假设您想留在DataFrame世界中,那么定义一个操纵输入数组的UDF可能是值得的。以Seq作为输入并返回修改后的输入的内容。例如:
def myUdf = udf[Seq[String], Seq[String]] {
inputSeq => inputSeq.map(elem => elem + "_2")
}
df.withColumn("Emails", myUdf($"Emails"))更好的是,您可以将确切的逻辑作为参数传递:
def myUdf(myFunc: String => String) = udf[Seq[String], Seq[String]] {
inputSeq => inputSeq.map(myFunc)
}
df.withColumn("Emails", myUdf((email: String) => email + "_XYZ")($"Emails"))发布于 2018-04-02 14:07:01
除了所有常见字段的groupby之外,另一个选项是在单独的临时数据re上执行爆炸操作,然后从原始字段中删除已爆炸的列,然后加入由
然而,编写一个可以直接操作数组的UDF可能更简单,而不会进入爆炸和聚集。
def handleEmail(emails: mutable.WrappedArray[String]) = {
emails.map(dosomething)
}
context.udf.register("handleEmailsm"m (em:mutabe.WrappedArray[String]) => handleEmail(em))发布于 2018-04-02 14:13:18
这意味着我需要在groupBy中添加大约20列,当我真的可以通过一个单独的列执行组时,例如键是唯一的。
您可以跳过编写每个列名,只需执行一个简单的技巧,如下所示,您可以使用所有列名(或选定的列名),但爆炸的列名除外。
import org.apache.spark.sql.functions._
val dfExploded = df.withColumn("Emails", explode($"Emails"))
val groupColumns = dfExploded.columns.filterNot(_.equalsIgnoreCase("Emails"))
val dfOriginal = dfExploded.groupBy(groupColumns.map(col): _*).agg(collect_set("Emails").alias("Emails"))创建一个结构列
您可以使用struct内置函数创建单个列,并在groupBy中将该单列用作
val groupColumns = df.columns.filterNot(_.equalsIgnoreCase("Emails"))
import org.apache.spark.sql.functions._
val dfExploded = df.select(struct(groupColumns.map(col): _*).as("groupedKey"), col("Emails"))
.withColumn("Emails", explode($"Emails"))这会让你
+-------------------+------------+
|groupedKey |Emails |
+-------------------+------------+
|[0001,passport1,23]|Alan@gmail |
|[0001,passport1,23]|Alan@hotmail|
|[0002,passport2,28]|Ben@gmail |
|[0002,passport2,28]|Ben@hotmail |
+-------------------+------------+然后在groupedKey中使用groupBy,然后在select中再次将它们分开
val dfOriginal = dfExploded.groupBy("groupedKey").agg(collect_set("Emails").alias("Emails"))
.select($"groupedKey.*", $"Emails")https://stackoverflow.com/questions/49612641
复制相似问题