首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Apache Spark:使用RDD.aggregateByKey()实现RDD.groupByKey()的等价物是什么?

Apache Spark:使用RDD.aggregateByKey()实现RDD.groupByKey()的等价物是什么?
EN

Stack Overflow用户
提问于 2015-06-27 04:24:55
回答 2查看 9.1K关注 0票数 11

Apache文档提到了groupByKey()效率低下。相反,建议使用reduceByKey()aggregateByKey()combineByKey()foldByKey()。这将导致在混洗之前在工作进程中进行一些聚合,从而减少工作进程之间的数据混洗。

给定以下数据集和groupByKey()表达式,什么是不利用groupByKey()但提供相同结果的等效且有效的实现(减少的跨工作进程数据混洗)?

代码语言:javascript
运行
复制
dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
       .groupByKey())
print sorted(rdd.mapValues(list).collect())

输出:

代码语言:javascript
运行
复制
[('a', [7, 8]), ('b', [3])]
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-06-27 05:21:37

据我所知,在这种特殊情况下,使用aggregateByKey或类似的函数没有什么好处。由于您正在构建一个列表,因此没有“真正的”缩减,并且必须混洗的数据量大致相同。

要真正观察到一些性能增益,您需要进行转换,这些转换实际上减少了传输的数据量,例如计数、计算汇总统计信息、查找唯一元素。

关于使用reduceByKey()combineByKey()foldByKey()的区别和好处,有一个重要的概念上的区别,当您考虑Scala API特征时,这一点更容易看出。

reduceByKeyfoldByKey都从RDD[(K, V)]映射到RDD[(K, V)],而第二个提供了额外的零元素。

代码语言:javascript
运行
复制
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

combineByKey (没有aggregateByKey,但它是同一类型的转换)从RDD[(K, V)]转换为RDD[(K, C)]

代码语言:javascript
运行
复制
combineByKey[C](
   createCombiner: (V) ⇒ C,
   mergeValue: (C, V) ⇒ C,
   mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

返回到您的示例,只有combineByKey (在PySpark aggregateByKey中)才是真正适用的,因为您正在从RDD[(String, Int)]转换到RDD[(String, List[Int])]

虽然在Python这样的动态语言中,使用foldByKeyreduceByKey执行这样的操作实际上是可能的,但这会使代码的语义变得不清楚,并引用@tim-peters的话说:“应该有1个,最好只有一个--显而易见的方法”1。

aggregateByKeycombineByKey之间的区别与reduceByKeyfoldByKey之间的区别基本相同,所以对于一个列表来说,这主要是一个品味问题:

代码语言:javascript
运行
复制
def merge_value(acc, x):
    acc.append(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.extend(acc2)
    return acc1

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
   .combineByKey(
       lambda x: [x],
       lambda u, v: u + [v],
       lambda u1,u2: u1+u2))

但在实践中,您应该更喜欢groupByKey。与上面提供的简单实现相比,PySpark实现要优化得多。

1. 1.Peters,T. PEP 20 -- Python的禅宗。(2004)。在https://www.python.org/dev/peps/pep-0020/

*在实践中,这里实际上有相当多的松散,特别是在使用PySpark时。groupByKey的Python实现比naive combine by key要优化得多。你可以查看由我和@eliasah创建的Be Smart About groupByKey来进行更多的讨论。

票数 18
EN

Stack Overflow用户

发布于 2015-06-27 04:24:55

这里有一个使用aggregateByKey()的选项。我很想知道如何使用reduceByKey()combineByKey()foldByKey()来实现这一点,以及每种替代方案的成本/收益是什么。

代码语言:javascript
运行
复制
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
       .aggregateByKey(list(),
                       lambda u,v: u+[v],
                       lambda u1,u2: u1+u2))
print sorted(rdd.mapValues(list).collect())

输出:

代码语言:javascript
运行
复制
[('a', [7, 8]), ('b', [3])]

下面是一个内存效率稍高的实现,尽管python新手的可读性较差,但会产生相同的输出:

代码语言:javascript
运行
复制
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
       .aggregateByKey(list(),
                       lambda u,v: itertools.chain(u,[v]),
                       lambda u1,u2: itertools.chain(u1,u2)))
print sorted(rdd.mapValues(list).collect())
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31081563

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档