我想将这个特殊的Apache Spark与Python解决方案分享,因为它的文档相当糟糕。
我想按键计算K/V对(存储在成对RDD中)的平均值。示例数据如下所示:
>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
现在,下面的代码序列是一种不是最优的方式,但它确实可以工作。这就是我在找到更好的解决方案之前所做的事情。这并不可怕,但是--正如您将在答案部分看到的--有一种更简洁、更有效的方法。
>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]
发布于 2015-04-29 05:21:23
现在,一种更好的方法是使用rdd.aggregateByKey()
方法。因为这种方法在Apache Spark with Python文档中的记录非常少--这也是我写这篇问答的原因--直到最近我一直在使用上面的代码序列。但同样,它的效率较低,所以除非必要,否则应避免这样做。
下面是如何使用(recommended):方法rdd.aggregateByKey()
来做同样的事情
通过KEY,同时计算SUM (我们要计算的平均值的分子)和COUNT (我们要计算的平均值的分母):
>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
其中,关于上述每个a
和b
对的含义,以下内容是正确的(这样您就可以直观地看到发生了什么):
First lambda expression for Within-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a SCALAR that holds the next Value
Second lambda expression for Cross-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
最后,计算每个键的平均值,并收集结果。
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
[(u'2013-09-09', 11.235365503035176),
(u'2013-09-01', 23.39500642456595),
(u'2013-09-03', 13.53240060820617),
(u'2013-09-05', 13.141148418977687),
... snip ...
]
我希望这个关于aggregateByKey()
的问题和答案会有所帮助。
发布于 2016-08-12 13:20:12
在我看来,具有两个lambda的aggregateByKey的可读性更好的等价物是:
rdd1 = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
这样,整个平均计算将是:
avg_by_key = rdd1 \
.mapValues(lambda v: (v, 1)) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
.mapValues(lambda v: v[0]/v[1]) \
.collectAsMap()
发布于 2017-02-03 08:49:24
只是添加一个关于这个问题的直观和简短(但糟糕的)解决方案的注释。Sam's Teach Yourself Apache Spark in 24 Hours一书在上一章中已经很好地解释了这个问题。
使用groupByKey
one可以轻松地解决这个问题,如下所示:
rdd = sc.parallelize([
(u'2013-10-09', 10),
(u'2013-10-09', 10),
(u'2013-10-09', 13),
(u'2013-10-10', 40),
(u'2013-10-10', 45),
(u'2013-10-10', 50)
])
rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()
输出:
[('2013-10-10', 45.0), ('2013-10-09', 11.0)]
这很直观,很吸引人,但是不使用它!groupByKey
不在映射器上进行任何组合,并将所有单独的键值对带到缩减程序。
尽可能避免使用groupByKey
。使用@pat这样的reduceByKey
解决方案。
https://stackoverflow.com/questions/29930110
复制相似问题