我希望在Apache Spark join中包含空值。默认情况下,Spark不包含null行。
这是默认的Spark行为。
val numbersDf = Seq(
("123"),
("456"),
(null),
("")
).toDF("numbers")
val lettersDf = Seq(
("123", "abc"),
("456", "def"),
(null, "zzz"),
("", "hhh")
).toDF("numbers", "letters")
val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))
下面是joinedDf.show()
的输出
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
+-------+-------+
这是我想要的输出:
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
| null| zzz|
+-------+-------+
发布于 2017-01-19 05:15:22
val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join
val letters2 = lettersDf.withColumnRenamed("numbers","num2")
val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull && $"num2".isNull) ,"outer")
joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show //rename the columns back to the original names
发布于 2019-09-09 05:24:20
基于K L的思想,您可以使用foldLeft生成连接列表达式:
def nullSafeJoin(rightDF: DataFrame, columns: Seq[String], joinType: String)(leftDF: DataFrame): DataFrame =
{
val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
val fullExpr = columns.tail.foldLeft(colExpr) {
(colExpr, p) => colExpr && leftDF(p) <=> rightDF(p)
}
leftDF.join(rightDF, fullExpr, joinType)
}
然后,您可以像这样调用此函数:
aDF.transform(nullSafejoin(bDF, columns, joinType))
发布于 2021-01-27 06:27:22
基于timothyzhang的想法,可以通过删除重复的列来进一步改进它:
def dropDuplicateColumns(df: DataFrame, rightDf: DataFrame, cols: Seq[String]): DataFrame
= cols.foldLeft(df)((df, c) => df.drop(rightDf(c)))
def joinTablesWithSafeNulls(rightDF: DataFrame, leftDF: DataFrame, columns: Seq[String], joinType: String): DataFrame =
{
val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
val fullExpr = columns.tail.foldLeft(colExpr) {
(colExpr, p) => colExpr && leftDF(p) <=> rightDF(p)
}
val finalDF = leftDF.join(rightDF, fullExpr, joinType)
val filteredDF = dropDuplicateColumns(finalDF, rightDF, columns)
filteredDF
}
https://stackoverflow.com/questions/41728762
复制相似问题