我试图根据某些列加入df1,然后根据过滤器从df1中筛选出一些行。
df1:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| green|
|Rapid Cash Plus| green|
| DOTOPAL| green|
| RAPID CASH| green|
df2:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| blue|
|Rapid Cash Plus| blue|
| DOTOPAL| blue|
+---------------+----------+
示例代码是:
df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
.filter(not(df1.col("rag_status") === "green"))
.select(df1.col("channel"), df1.col("rag_status")).show
它没有归还任何记录。
我期望输出如下所示,在根据channel
和green
状态条件过滤记录之后,从green
返回。如果df2中有相同的通道,而df1 rag_status
是绿色的,那么从df1中删除该记录,只从df1返回其余的记录。
预期输出为:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| RAPID CASH| green|
发布于 2016-11-29 09:46:34
你可以这样做:
val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show
火花-外壳输出:
scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]
scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]
scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]
scala> resultDF.show
+----------+----------+
| channel|rag_status|
+----------+----------+
|RAPID CASH| green|
+----------+----------+
发布于 2016-11-29 10:33:52
您可以使用下面的代码获得预期的输出:
df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")
https://stackoverflow.com/questions/40861729
复制相似问题