给定以下代码,我将尝试按月计算浮点列的平均值。
rdd = sc.parallelize(
[['JAN', 'NY', 3.0],
['JAN', 'PA', 1.0],
['JAN', 'NJ', 2.0],
['JAN', 'CT', 4.0],
['FEB', 'PA', 1.0],
['FEB', 'NJ', 1.0],
['FEB', 'NY', 2.0],
['FEB', 'VT', 1.0],
['MAR', 'NJ', 2.0],
['MAR', 'NY', 1.0],
['MAR', 'VT', 2.0],
['MAR', 'PA', 3.0]])
def avg_map(row):
return (row[0], (row[2], 1))
def avg_reduce_func(value1, value2):
return (value1[0], (value1[1][0] + value2[1][0], value1[1][1] + value2[1][1]))
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()
从高层次的角度来看,我首先尝试使用map来创建以下形式的RDD:
[('JAN', (3.0, 1)),
('JAN', (1.0, 1)),
('JAN', (2.0, 1)),
('JAN', (4.0, 1)),
('FEB', (1.0, 1)),
('FEB', (1.0, 1)),
('FEB', (2.0, 1)),
('FEB', (1.0, 1)),
('MAR', (2.0, 1)),
('MAR', (1.0, 1)),
('MAR', (2.0, 1)),
('MAR', (3.0, 1))]
然后,我想使用reduceByKey函数将1和浮点数相加,方法是按键创建一个新的RDD,它包含一个每月一行,一个表示浮点数总数的元组和一个表示行数的整数。例如,Jan行将如下所示:
('Jan',(10.0,4))
但是,我似乎不能正确地索引元组,并在reduceByKey函数中出现运行时错误。
问题1:为什么我不能索引avg_reduce_func中的元组?问题2:如何重写此代码以按月计算浮点列的平均值?
发布于 2019-07-15 04:22:58
我弄明白了,当只传入值时,我试图访问avg_reduce_func中的键。我最终得到了以下结论:
def avg_map_func(row):
return (row[0], (row[2], 1))
def avg_reduce_func(value1, value2):
return ((value1[0] + value2[0], value1[1] + value2[1]))
dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).mapValues(lambda x: x[0]/x[1]).collect()
发布于 2019-07-15 09:41:24
你使用RDDs有什么特别的原因吗?
这对于数据帧来说很简单,而且效率会更高:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count
d = [['JAN', 'NY', 3.0],
['JAN', 'PA', 1.0],
['JAN', 'NJ', 2.0],
['JAN', 'CT', 4.0],
['FEB', 'PA', 1.0],
['FEB', 'NJ', 1.0],
['FEB', 'NY', 2.0],
['FEB', 'VT', 1.0],
['MAR', 'NJ', 2.0],
['MAR', 'NY', 1.0],
['MAR', 'VT', 2.0],
['MAR', 'PA', 3.0]]
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(d).selectExpr(
"_1 as month", "_2 as state", "_3 as float_col")
df.show()
'''
+-----+-----+---------+
|month|state|float_col|
+-----+-----+---------+
| JAN| NY| 3.0|
| JAN| PA| 1.0|
| JAN| NJ| 2.0|
| JAN| CT| 4.0|
| FEB| PA| 1.0|
| FEB| NJ| 1.0|
| FEB| NY| 2.0|
| FEB| VT| 1.0|
| MAR| NJ| 2.0|
| MAR| NY| 1.0|
| MAR| VT| 2.0|
| MAR| PA| 3.0|
+-----+-----+---------+
'''
agg_df = df.groupBy("month").agg(
sum('float_col').alias('float_sum'),
count('month').alias('month_count')
)
agg_df.show()
'''
+-----+---------+-----------+
|month|float_sum|month_count|
+-----+---------+-----------+
| FEB| 5.0| 4|
| JAN| 10.0| 4|
| MAR| 8.0| 4|
+-----+---------+-----------+
'''
https://stackoverflow.com/questions/57030626
复制相似问题