前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据分析EPHS(6)-使用Spark计算数列统计值

数据分析EPHS(6)-使用Spark计算数列统计值

作者头像
石晓文
发布2019-07-30 15:11:28
1.3K0
发布2019-07-30 15:11:28
举报
文章被收录于专栏:小小挖掘机小小挖掘机

前两篇中咱们分别介绍了使用Excel、Python和Hive SQL计算统计值,这次咱们使用Spark SQL来计算统计值。

数据分析EPHS(4)-使用Excel和Python计算数列统计值

数据分析EPHS(5)-使用Hive SQL计算数列统计值

先来回顾一下数据和对应的统计结果:

本文使用的是iris分类数据集,数据下载地址为:

http://archive.ics.uci.edu/ml/datasets/Iris

下载后转换为xlsx格式的文件,数据如下:

对应的统计结果如下:

在介绍之前,我还是想先说明一点,这一篇只是想先带大家体验一把Spark SQL,相关更多关于原理相关的知识,咱们会在后面的文章中详细介绍。

1、数据导入

这里咱们通过读取Excel的方式读取出相应的数据,并得到一个DataFrame:

代码语言:javascript
复制
def createDFByCSV(spark:SparkSession) = {
    val df = spark.sqlContext.read.format("com.databricks.spark.csv")
      .option("header","true") //这里如果在csv第一行有属性的话,没有就是"false"
      .option("inferSchema",true.toString)//这是自动推断属性列的数据类型。
      .load("resources/iris.csv")

    df.show()
  }

结果如下:

2、使用Spark SQL计算统计值

2.1 最大值、最小值

使用Spark SQL统计最大值或者最小值,首先使用agg函数对数据进行聚合,这个函数一般配合group by使用,不使用group by的话就相当于对所有的数据进行聚合。

随后,直接使用max和min函数就可以,想要输出多个结果的话,中间用逗号分开,而使用as给聚合后的结果赋予一个列名,相当于sql中的as:

代码语言:javascript
复制
import spark.implicits._

    df.agg(max($"feature1") as "max_feature1",
        min($"feature2") as "min_feature2")
      .show()

结果输出如下:

上面的$代表一列的意思,相当于col函数:

代码语言:javascript
复制
df.agg(max(col("feature1")) as "max_feature1",
        min(col("feature2")) as "min_feature2")
      .show()

2.2 平均值

平均值的计算使用mean函数:

代码语言:javascript
复制
df.agg(mean($"feature1") as "mean_feature1",
      mean($"feature2") as "mean_feature2").show()

输出为:

2.3 样本标准差&总体标准差

样本标准差的计算有两个函数可以使用,分别是stddev函数和stddev_samp函数,而总体标准差使用stddev_pop方法。需要注意的一点是,这里和hive sql是有区别的,在hive sql中,stddev函数代表的是总体标准差,而在spark sql中,stddev函数代表的是样本标准差,可以查看一下源代码:

通过代码验证一下:

代码语言:javascript
复制
df.agg(stddev($"feature1") as "stddev_feature1",
      stddev_pop($"feature1") as "stddev_pop_feature1",
      stddev_samp($"feature1") as "stddev_samp_feature1").show()

输出结果为:

2.4 中位数

SparkSQL中也没有直接计算中位数的方法,所以我们还是借鉴上一篇中的思路,再来回顾一下:

计算中位数也好,计算四分位数也好,无非就是要取得两个位置嘛,假设我们的数据从小到大排,按照1、2、3、.. 、n进行编号,当数量n为奇数时,取编号(n + 1)/2位置的数即可,当n为偶数时,取(int)(n + 1)/2位置和(int)(n + 1)/2 + 1位置的数取平均即可。但二者其实可以统一到一个公式中:

1)假设n = 149 ,(n+1)/2 = 75 ,小数部分为0,那么中位数=75位置的数 * (1 - 0)+ 76位置的数 * (0 - 0) 2)假设n = 150,(n+1)/2 = 75,小数部分为0.5,那么中位数=75位置的数 * (1 - 0.5)+ 76位置的数 * (0.5 - 0)

所以,可以把这个过程分解为三个步骤,第一步是给数字进行一个编号,spark中同样使用row_number()函数(该函数的具体用法后续再展开,这里只提供一个简单的例子),第二步是计算(n+1)/2的整数部分和小数部分,第三步就是根据公式计算中位数。

首先使用row_number()给数据进行编号:

代码语言:javascript
复制
val windowFun = Window.orderBy(col("feature3").asc)
df.withColumn("rank",row_number().over(windowFun)).show(false)

输出如下:

接下来是确定中位数的位置,这里我们分别拿到(n + 1)/2的整数部分和小数部分:

代码语言:javascript
复制
val median_index = df.agg(
  ((count($"feature3") + 1) / 2).cast("int") as "rank",
  ((count($"feature3") + 1) / 2 %  1) as "float_part"
)

median_index.show()

输出如下:

这里小数部分不为0,意味着我们不仅要拿到rank=75的数,还要拿到rank=76的数,我们最好把其放到一行上,这里使用同样lead函数,lead函数的作用就是拿到分组排序后,下一个位置或下n个位置的数,咱们在后面的博客中还会细讲,这里也只是抛砖引玉:

代码语言:javascript
复制
val windowFun = Window.orderBy(col("feature3").asc)
df.withColumn("next_feature3",lead(col("feature3"),1).over(windowFun)).show(false)

输出如下:

接下来,join两个表,按公式计算中位数就可以啦,完整的代码如下:

代码语言:javascript
复制
val median_index = df.agg(
  ((count($"feature3") + 1) / 2).cast("int") as "rank",
  ((count($"feature3") + 1) / 2 %  1) as "float_part"
)


val windowFun = Window.orderBy(col("feature3").asc)


df.withColumn("rank",row_number().over(windowFun))
  .withColumn("next_feature3",lead(col("feature3"),1).over(windowFun))
  .join(median_index,Seq("rank"),"inner")
  .withColumn("median" ,($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
  .show()

输出如下:

2.5 四分位数

先来复习下四分位数的两种解法,n+1方法和n-1方法:

对于n+1方法,如果数据量为n,则四分位数的位置为:

Q1的位置= (n+1) × 0.25 Q2的位置= (n+1) × 0.5 Q3的位置= (n+1) × 0.75

对于n-1方法,如果数据量为n,则四分位数的位置为:

Q1的位置=1+(n-1)x 0.25 Q2的位置=1+(n-1)x 0.5 Q3的位置=1+(n-1)x 0.75

这里的思路和求解中位数是一样的,我们分别实现一下两种方法,首先是n+1方法:

代码语言:javascript
复制
val q1_index = df.agg(
  ((count($"feature3") + 1) * 0.25).cast("int") as "rank",
  ((count($"feature3") + 1) * 0.25 %  1) as "float_part"
)


val windowFun = Window.orderBy(col("feature3").asc)


df.withColumn("rank",row_number().over(windowFun))
  .withColumn("next_feature3",lead(col("feature3"),1).over(windowFun))
  .join(q1_index,Seq("rank"),"inner")
  .withColumn("q1" ,($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
  .show()

输出为:

接下来是n-1方法:

代码语言:javascript
复制
val q1_index = df.agg(
  ((count($"feature3") - 1) * 0.25).cast("int") + 1 as "rank",
  ((count($"feature3") - 1) * 0.25 %  1) as "float_part"
)


val windowFun = Window.orderBy(col("feature3").asc)


df.withColumn("rank",row_number().over(windowFun))
  .withColumn("next_feature3",lead(col("feature3"),1).over(windowFun))
  .join(q1_index,Seq("rank"),"inner")
  .withColumn("q1" ,($"float_part" - lit(0)) * $"next_feature3" + (lit(1) - $"float_part") * $"feature3")
  .show()

输出为:

3、踩坑总结

在计算中位数或者四分位数时,我一开始的写法如下:

很奇怪的一点是,$"float_part" - 0没有报错,1 - $"float_part"却报错了,报的错误是:

看这里大家应该明白了,$"float_part" - 0中,减号左右两边的数据都应该是列名,与$"float_part" 类型相同,但是1 - $"float_part"两边都应该是个数字,与1的类型相同,所以后面一个报错了。

因此修改的方法是:

使用lit方法创建了一个全为0或者全为1的列,使得减号左右两边类型匹配。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小小挖掘机 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、数据导入
  • 2、使用Spark SQL计算统计值
    • 2.1 最大值、最小值
      • 2.2 平均值
        • 2.3 样本标准差&总体标准差
          • 2.4 中位数
            • 2.5 四分位数
            • 3、踩坑总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档