专栏首页麒思妙想使用Spark轻松做数据透视(Pivot)

使用Spark轻松做数据透视(Pivot)

spark从1.6开始引入,到现在2.4版本,pivot算子有了进一步增强,这使得后续无论是交给pandas继续做处理,还是交给R继续分析,都简化了不少。大家无论在使用pandas、numpy或是R的时候,首先会做的就是处理数据,尤其是将列表,转成成合适的形状。

列表

在说透视表之前,我们先看看,什么是列表,在传统观念上,列表的每一行代表一条记录,而每一列代表一个属性。

+-------+-------+-----+

| date|project|value|

+-------+-------+-----+

|2018-01| p1| 100|

|2018-01| p2| 200|

|2018-01| p3| 300|

|2018-02| p1| 1000|

|2018-02| p2| 2000|

|2018-03| px| 999|

+-------+-------+-----+

举个简单的例子,如上表,一条记录可能代表某个项目,在某个年月创造的价值。而在这个表里面,某一列,就代表一个属性,比如date代表日期,project代表项目名称。而这里每一行,代表一条独立,完整的记录,一条与另外一条记录,没有直接的关系。

这种结构,也是一般关系型数据库的数据结构。

透视表

透视表没有一个明确的定义,一般是观念上是指,为了方便进行数据分析,而对数据进行一定的重排,方便后续分析,计算等操作。透视表每一个元素及其对应的“坐标”一起形成一条完整的记录。

+-------+------+------+-----+-----+

| date| p1| p2| p3| px|

+-------+------+------+-----+-----+

|2018-01| 100.0| 200.0|300.0| 0.0|

|2018-02|1000.0|2000.0| 0.0| 0.0|

|2018-03| 0.0| 0.0| 0.0|999.0|

+-------+------+------+-----+-----+

上面的表,是将列表进行重排后的透视表,其第一行和第一列可以理解成索引,而在表中根据索引可以确定一条唯一的值,他们一起组成一条相当于列表里的数据。

通过一般的定义,我们能看出,透视表主要用于分析,所以,一般的场景我们都会先对数据进行聚合,以后再对数据分析,这样也更有意义。就好像,将话费清单,做成透视表,尽管逻辑上没有任何问题,但是结果是可能比现在的清单列表更难查阅。

PS:一些可以借鉴的名词,目前维基百科并没有收录,也只能权且理解一下吧

建模拟数据

先来模拟个数据吧,按照前面的例子,建个csv,这里多加了一列s2,是为了做多透视列的,

date,project,value,s2
2018-01,p1,100,12
2018-01,p2,200,33
2018-01,p3,300,44
2018-02,p1,1000,22
2018-02,p2,2000,41
2018-03,px,999,22

spark API

我们先来看下DEMO程序

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
SparkContext sc = SparkContext.getOrCreate(sparkConf);
SparkSession ss = new SparkSession(sc);
Dataset<Row> ds = ss.read()
         //csv分隔符 
        .option("sep", ",")
         //是否包含header
        .option("header", "true")
        //加载csv路径
        .csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv");
Dataset<Row>  r = 
        //设置分组
        ds.groupBy(col("date"))
        //设置pivot
        .pivot("project")
        //设置聚合
        .agg(sum("value"));
r.show();

在加载csv的时候,我们设置了分隔符,以及读取表头。

对加载后的dataset只需要进行3步设置

  1. groupBy 设置分组列
  2. pivot 设置pivot列
  3. agg 设置聚合方式,可以是求和、平均等聚合函数

我们得到的输出结果如下:

+-------+------+------+-----+-----+

| date| p1| p2| p3| px|

+-------+------+------+-----+-----+

|2018-03| null| null| null|999.0|

|2018-02|1000.0|2000.0| null| null|

|2018-01| 100.0| 200.0|300.0| null|

+-------+------+------+-----+-----+

请注意,这里和sql有些区别,就是groupBy的时候,不需要将project列写入了,如果写入成了

groupBy(col("date"),col("project"))

那么结果就是这样了

+-------+-------+------+------+-----+-----+

| date|project| p1| p2| p3| px|

+-------+-------+------+------+-----+-----+

|2018-01| p3| null| null|300.0| null|

|2018-01| p2| null| 200.0| null| null|

|2018-01| p1| 100.0| null| null| null|

|2018-03| px| null| null| null|999.0|

|2018-02| p1|1000.0| null| null| null|

|2018-02| p2| null|2000.0| null| null|

+-------+-------+------+------+-----+-----+

sparkSQL

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
SparkContext sc = SparkContext.getOrCreate(sparkConf);
SparkSession ss = new SparkSession(sc);
Dataset<Row> ds = ss.read() .option("sep", ",")
        .option("header", "true").csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv");
ds.registerTempTable("f");
Dataset<Row>  r = ds.sqlContext().sql(
 "select * from (
    select date,project as p,sum(value) as ss from f group by date,project
   )
  pivot (  
      sum(ss) 
      for p in ( 'p1','p2','p3','px' )  
   ) 
   order by date");
r.na().fill(0).show();

可以看到,这里我们将读取的csv注册成了表f,使用spark sql语句,这里和oracle的透视语句类似

pivot语法: pivot( 聚合列 for 待转换列 in (列值) )

其语法还是比较简单的。

为了展示数据好看一点,我特意使用语句

r.na().fill(0)

将空值`null`替换成了0。

+-------+------+------+-----+-----+

| date| p1| p2| p3| px|

+-------+------+------+-----+-----+

|2018-01| 100.0| 200.0|300.0| 0.0|

|2018-02|1000.0|2000.0| 0.0| 0.0|

|2018-03| 0.0| 0.0| 0.0|999.0|

+-------+------+------+-----+-----+

多聚合列

上文提到了,多做了一列,就是为了这个DEMO准备的,使用如下SparkSQL语句,设置多聚合列透视表

select * from (
    select date,project as p,sum(value) as ss,sum(s2) as ss2 from f group by date,project
)
pivot (  
      sum(ss),sum(ss2)  
     for p in ( 'p1','p2','p3','px' ) 
) 
order by date

这里为例方便看,我就截图了

为了防止OOM的情况,spark对pivot的数据量进行了限制,其可以通过spark.sql.pivotMaxValues 来进行修改,默认值为10000,这里是指piovt后的列数。

好了,关于spark pivot就介绍到这了,其实这里与矩阵的行列转换类似,pivot对应的也有unpivot,下次我们再聊。

参考资料:

https://stackoverflow.com/questions/30244910/how-to-pivot-dataframe

https://databricks.com/session/pivoting-data-with-sparksql

本文分享自微信公众号 - 麒思妙想(qicai1612),作者:dafei1288

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-02-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 基于REST的数据处理流程_开发环境踩坑

    昨天捋了捋思路,今天着手开始准备构建基础工程,spring boot是我一直比较偏爱的,现在在国内开发领域也越发的火了起来。另外准备试试kotlin,之...

    麒思妙想
  • JDK10居然都发布了

    今天突然发现JDK10居然都发布了,真的感觉JDK9发布没几天啊,下面是介绍具体内容的链接,我就不复制了....

    麒思妙想
  • 试水Jib

    之前一直沉迷于vagrant之中,其对于环境部署的友好,谁用谁知道,但是在最近的开发过程当中,越发的发现vagrant对开发人员的友好,而对于后续的...

    麒思妙想
  • CSS判断不同分辨率显示不同宽度布局CSS3技术支持IE6到IE8

    CSS判断不同分辨率浏览器(显示屏幕)显示不同宽度布局CSS3技术支持IE6到IE8。将用到css3 @media样式进行判断,但IE9以下版本不支持CSS3技...

    庞小明
  • 使用jatoolsPrinter实现套打

    最近在工作中遇到了套打的需求,前前后后,花了不少时间,现在总结一下套打的实现方式。

    week
  • 并发编程5:Java 阻塞队列源码分析(下)

    上一篇 并发编程4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueue 和 Prio...

    张拭心 shixinzhang
  • CCLabelAtlas的宽度为奇数时的显示bug

    遇到一个很郁闷的bug,CCLabelAtlas设置文字内容在ipad上和android上正常,就只有iphone怎么显示都不正常。后来把它宽度 + 1,然后就...

    meteoric
  • ACM 计算几何 个人模板

    owent
  • Android多线程:手把手教你使用IntentService(含实例讲解)

    步骤1:定义 IntentService的子类,需复写onHandleIntent()方法 步骤2:在Manifest.xml中注册服务 步骤3:在Acti...

    Carson.Ho
  • ELK-logstash-6.3.2部署

      Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。

    踏歌行

扫码关注云+社区

领取腾讯云代金券