我尝试按键对urldata进行分组,其中的值将是字符串
示例数据:
url_3 url_2
url_3 url_2
url_3 url_1
url_4 url_3
url_4 url_1预期结果:
(url_3,(url_2,url_1))
(url_4,(url_3,url_1))1)加载urldata:
Dataset<String> lines = spark.read()
.textFile("C:/Users/91984/workspace/myApp/src/test/resources/in/urldata.txt");2)使用空格拆分数据集
Encoder<Tuple2<String, String>> encoder2 =
Encoders.tuple(Encoders.STRING(), Encoders.STRING());
Dataset<Tuple2<String, String>> tupleRDD = lines.map(f->{
Tuple2<String, String> m =
new Tuple2<String, String>(f.split(" ")[0], f.split(" ")[1]);
return m;
},encoder2);3)使用groupbyKey对tupleRDD数据库按键进行分组
KeyValueGroupedDataset<String, Tuple2<String, String>> keygrpDS =
tupleRDD.groupByKey(f->f._1, Encoders.STRING());有人能解释一下为什么步骤3中的groupByKey返回的是KeyValueGroupedDataset<String, Tuple2<String, String>>而不是KeyValueGroupedDataset<String, Iterable<String>>吗?为了获得预期的结果,需要做哪些更改?
发布于 2018-11-25 02:25:42
这就是它在spark中处理数据集的方式。当您有一个类型为Dataset<T>的数据集时,您可以通过某种映射函数对其进行分组,该映射函数接受类型T的对象并返回类型K(键)的对象。您得到的是一个可以在其上调用聚合函数的KeyValueGroupedDataset<K,T> (请参阅the javadoc)。在您的例子中,您可以使用mapGroups,您可以向它提供一个函数,将一个键K和一个可迭代的Iterable<T>映射到您选择的新对象R。如果有用的话,在你的代码中,T是一个Tuple2,K是一个URL。
发布于 2018-11-25 02:29:20
Spark要求您使用aggregation方法遵循您的groupBY。我会将tupleRDD作为DataFrame,如下所示:
column1 column2
url_3 url_2
url_3 url_2
url_3 url_1
url_4 url_3
url_4 url_1并像这样传递一个collect_list(column2)
df.groupBy('column1').agg('column2', collect_list('column2'))。
这个例子是用Python编写的。不过,Scala/Java API应该是类似的。
https://stackoverflow.com/questions/53460706
复制相似问题