首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用Pyspark计算RDD上的平均值

如何使用Pyspark计算RDD上的平均值
EN

Stack Overflow用户
提问于 2019-07-15 03:47:41
回答 2查看 4.3K关注 0票数 3

给定以下代码,我将尝试按月计算浮点列的平均值。

代码语言:javascript
运行
复制
rdd = sc.parallelize(
 [['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]])

def avg_map(row):
    return (row[0], (row[2], 1))

def avg_reduce_func(value1, value2):
    return (value1[0], (value1[1][0] + value2[1][0], value1[1][1] + value2[1][1]))

dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()

从高层次的角度来看,我首先尝试使用map来创建以下形式的RDD:

代码语言:javascript
运行
复制
[('JAN', (3.0, 1)),
 ('JAN', (1.0, 1)),
 ('JAN', (2.0, 1)),
 ('JAN', (4.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (2.0, 1)),
 ('FEB', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (3.0, 1))]

然后,我想使用reduceByKey函数将1和浮点数相加,方法是按键创建一个新的RDD,它包含一个每月一行,一个表示浮点数总数的元组和一个表示行数的整数。例如,Jan行将如下所示:

('Jan',(10.0,4))

但是,我似乎不能正确地索引元组,并在reduceByKey函数中出现运行时错误。

问题1:为什么我不能索引avg_reduce_func中的元组?问题2:如何重写此代码以按月计算浮点列的平均值?

EN

回答 2

Stack Overflow用户

发布于 2019-07-15 04:22:58

我弄明白了,当只传入值时,我试图访问avg_reduce_func中的键。我最终得到了以下结论:

代码语言:javascript
运行
复制
def avg_map_func(row):
    return (row[0], (row[2], 1))

def avg_reduce_func(value1, value2):
    return ((value1[0] + value2[0], value1[1] + value2[1])) 

dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).mapValues(lambda x: x[0]/x[1]).collect()
票数 3
EN

Stack Overflow用户

发布于 2019-07-15 09:41:24

你使用RDDs有什么特别的原因吗?

这对于数据帧来说很简单,而且效率会更高:

代码语言:javascript
运行
复制
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count

d =  [['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]] 

 spark = SparkSession.builder.getOrCreate()

 df = spark.createDataFrame(d).selectExpr(
     "_1 as month", "_2 as state", "_3 as float_col")
 df.show()

 '''
 +-----+-----+---------+
 |month|state|float_col|
 +-----+-----+---------+
 |  JAN|   NY|      3.0|
 |  JAN|   PA|      1.0|
 |  JAN|   NJ|      2.0|
 |  JAN|   CT|      4.0|
 |  FEB|   PA|      1.0|
 |  FEB|   NJ|      1.0|
 |  FEB|   NY|      2.0|
 |  FEB|   VT|      1.0|
 |  MAR|   NJ|      2.0|
 |  MAR|   NY|      1.0|
 |  MAR|   VT|      2.0|
 |  MAR|   PA|      3.0|
 +-----+-----+---------+
 '''

 agg_df = df.groupBy("month").agg(
     sum('float_col').alias('float_sum'),
     count('month').alias('month_count')
 )
 agg_df.show()

 '''
 +-----+---------+-----------+
 |month|float_sum|month_count|
 +-----+---------+-----------+
 |  FEB|      5.0|          4|
 |  JAN|     10.0|          4|
 |  MAR|      8.0|          4|
 +-----+---------+-----------+
 '''
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57030626

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档