我正在使用spark数据集API来删除几乎重复的数据。我要做的是对重复的行进行分组,以便每个组只留下一行,但是有一个列指定已折叠到该行中的行数。
考虑下面的例子。我有以下数据,其中最后一个字段指定折叠到该行中的行:
此时,我想将数据按第一个字段分组,保留行的其余字段,并将大部分行折叠到其中,并将折叠到第二个字段的行数添加到第一个字段中。因此,结果将是:
我已经实现了它,问题是结果数据的格式。
这是我的代码:
val sameTitleArticlesCollapsed = articlesCollapsed.groupByKey(_.TITLE).reduceGroups((a,b) => if(a.TIMES_COLLAPSED > b.TIMES_COLLAPSED) a.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED) else b.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED)).toDF("key", "data")
如果我在printSchema
上执行sameTitleArticlesCollapsed
,输出是:
root
|-- key: string (nullable = true)
|-- data: struct (nullable = true)
| |-- CODE: string (nullable = true)
| |-- TITLE: string (nullable = true)
| |-- NAUTHORS: string (nullable = true)
| |-- AUTHORS: string (nullable = true)
| |-- TIMES_COLLAPSED: decimal(38,0) (nullable = true)
我不关心key
列,我想要的是提取data
列中的数据,以保持与应用groupByKey - reduceGroups
之前相同的格式。
root
|-- CODE: string (nullable = true)
|-- TITLE: string (nullable = true)
|-- NAUTHORS: string (nullable = true)
|-- AUTHORS: string (nullable = true)
|-- TIMES_COLLAPSED: long (nullable = false)
我怎么能这么做?有什么更好的方法来做这个过程吗?
谢谢!
发布于 2017-02-21 18:50:29
您可以在后面添加一个映射,如下所示,以保留原始模式
val sameTitleArticlesCollapsed = articlesCollapsed.groupByKey(_.title).reduceGroups((a,b) => if(a.times_collapsed > b.times_collapsed) a.copy(times_collapsed = a.times_collapsed + b.times_collapsed) else b.copy(times_collapsed = a.times_collapsed + b.times_collapsed))
val result = sameTitleArticlesCollapsed.map({case (_,value) => value}).toDF
result.printSchema
root
|-- code: string (nullable = true)
|-- title: string (nullable = true)
|-- nauthors: string (nullable = true)
|-- authors: string (nullable = true)
|-- times_collapsed: long (nullable = true)
https://stackoverflow.com/questions/42375069
复制相似问题