首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >星火SQL DataFrame与筛选器的联接不起作用

星火SQL DataFrame与筛选器的联接不起作用
EN

Stack Overflow用户
提问于 2016-11-29 09:08:31
回答 2查看 9.4K关注 0票数 0

我试图根据某些列加入df1,然后根据过滤器从df1中筛选出一些行。

df1:

代码语言:javascript
运行
复制
+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|     green|
|Rapid Cash Plus|     green|
|        DOTOPAL|     green|
|     RAPID CASH|     green|

df2:

代码语言:javascript
运行
复制
+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|      blue|
|Rapid Cash Plus|      blue|
|        DOTOPAL|      blue|
+---------------+----------+

示例代码是:

代码语言:javascript
运行
复制
df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
      .filter(not(df1.col("rag_status") === "green"))
      .select(df1.col("channel"), df1.col("rag_status")).show

它没有归还任何记录。

我期望输出如下所示,在根据channelgreen状态条件过滤记录之后,从green返回。如果df2中有相同的通道,而df1 rag_status是绿色的,那么从df1中删除该记录,只从df1返回其余的记录。

预期输出为:

代码语言:javascript
运行
复制
+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|     RAPID CASH|     green|
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-11-29 09:46:34

你可以这样做:

代码语言:javascript
运行
复制
val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show

火花-外壳输出:

代码语言:javascript
运行
复制
scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]

scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]

scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]

scala> resultDF.show
+----------+----------+                                                         
|   channel|rag_status|
+----------+----------+
|RAPID CASH|     green|
+----------+----------+
票数 4
EN

Stack Overflow用户

发布于 2016-11-29 10:33:52

您可以使用下面的代码获得预期的输出:

代码语言:javascript
运行
复制
df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40861729

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档