我有一个流式数据帧,它可以在某个点上看起来像这样:
+--------------------+--------------------+
| owner| fruits|
+--------------------+--------------------+
|Brian | apple|
Brian | pear |
Brian | date|
Brian | avocado|
Bob | avocado|
Bob | apple|
........
+--------------------+--------------------+
我执行了一个groupBy,agg collect_list来清理东西。
val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")
输出是每个所有者的单行和每个水果的数组。现在,我想将这个清理过的数组加入到原始的流式数据帧中,去掉fruits列,只保留fruitsA列
val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")
这似乎在我的脑海中起作用,但spark似乎不同意。
我得到了一个
Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
+- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]
当我把所有东西都转换成静态数据帧时,它工作得很好。这在流环境中是不可能的吗?
发布于 2018-06-05 06:24:24
您是否尝试过重命名该列名称?https://issues.apache.org/jira/browse/SPARK-19860也有类似的问题
https://stackoverflow.com/questions/50587672
复制相似问题