专栏首页腾讯智能钛AI开发者【技术分享】Spark DataFrame入门手册
原创

【技术分享】Spark DataFrame入门手册

本文原作者:赖博先,经授权后发布。

一、简介

Spark SQL是spark主要组成模块之一,其主要作用与结构化数据,与hadoop生态中的hive是对标的。而DataFrame是spark SQL的一种编程抽象,提供更加便捷同时类同与SQL查询语句的API,让熟悉hive的数据分析工程师能够非常快速上手。

    DataFrame是一种以命名列的方式组织的分布式数据集,可以类比于hive中的表。但是比hive表更加灵活的是,你可以使用各种数据源来构建一个DataFrame,如:结构化数据文件(例如json数据)、hive表格、外部数据库,还可以直接从已有的RDD变换得来。后面会把相关方法、接口跟大家一一道来。

二、初步使用

大家学习一门语言可能都是从“hello word!”开始的,这主要目的是让学习者熟悉程序运行的环境,同时亲身感受程序运行过程。这里我们也会从环境到运行的步骤进行讲解。

导入spark运行环境相关的类

所有spark相关的操作都是以sparkContext类作为入口,而Spark SQL相关的所有功能都是以SQLContext类作为入口。下面的语句是新建入口类的对象。最下面的语句是引入隐式转换,隐式的将RDD转换为DataFrame。

下面就是从tdw表中读取对应的表格数据,然后就可以使用DataFrame的API来操作数据表格,其中TDWSQLProvider是数平提供的spark tookit,可以在KM上找到这些API的详细用法。

这段代码的意思是从tdw 表中读取对应分区的数据,select出表格中对应的字段(这里面的字段名字就是表格字段名字,需要用双引号)toDF将筛选出来的字段转换成DataFrame,在进行groupBy操作,这里的groupBy操作跟TDW hive操作是一样的意思,对指定字段进行分组操作,count函数用来计数计数,这里得到的DataFrame最后有一个”count”命名的字段保存每个分组的个数(这里特别需要注意函数的返回类型,groupby函数返回的并不是dataframe类型的数据,后面会提到)。接下来的printSchema函数是打印出edge的视图,可以理解成tdw idea里面的show DDL,Show函数是打印出这个DataFrame前20行数据(默认),当然可以指定行数打印。

从上面的例子中可以看出,DataFrame基本把SQL函数给实现了,在hive中用到的很多操作(如:select、groupBy、count、join等等)可以使用同样的编程习惯写出spark程序,这对于没有函数式编程经验的同学来说绝对福利。

三、函数说明及其用法

函数式编程是spark编程的最大特点,而函数则是函数式编程的最小操作单元,这边主要列举DataFrame常用函数以及主要用法:

Action 操作

特别注意每个函数的返回类型

1、 collect() ,返回值是一个数组,返回dataframe集合所有的行

2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行

3、 count() 返回一个number类型的,返回dataframe集合的行数

4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe("age", "height").show()

5、 first() 返回第一行 ,类型是row类型

6、 head() 返回第一行 ,类型是row类型

7、 head(n:Int)返回n行 ,类型是row 类型

8、 show()返回dataframe集合的值 默认是20行,返回类型是unit

9、 show(n:Int)返回n行,,返回值类型是unit

10、 table(n:Int) 返回n行 ,类型是row 类型

DataFrame的基本操作

1、 cache()同步数据的内存

2、 columns 返回一个string类型的数组,返回值是所有列的名字

3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型

4、 explan()打印执行计划

5、 explain(n:Boolean) 输入值为 false 或者true ,返回值是unit  默认是false ,如果输入true 将会打印 逻辑的和物理的

6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false

7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型

8、 printSchema() 打印出字段名称和类型 按照树状结构来打印

9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了

10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回

11、 toDF()返回一个新的dataframe类型的

12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,

13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据

14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD

聚合函数:

1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值

    df.agg(max("age"), avg("salary"))

    df.groupBy().agg(max("age"), avg("salary"))

2、 agg(exprs: Map[String, String])  返回dataframe类型 ,同数学计算求值 map类型的

    df.agg(Map("age" -> "max", "salary" -> "avg"))

    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)  返回dataframe类型 ,同数学计算求值

    df.agg(Map("age" -> "max", "salary" -> "avg"))

    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

4、 apply(colName: String) 返回column类型,捕获输入进去列的对象

5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名

6、 col(colName: String)  返回column类型,捕获输入进去列的对象

7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总

8、 distinct 去重 返回一个dataframe类型

9、 drop(col: Column) 删除某列 返回dataframe类型

10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe

11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的;这个操作非常有用呀

12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分

df.explode("name","names") {name :String=> name.split(" ")}.show();

将name字段根据空格来拆分,拆分的字段放在names里面

13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型

      df.filter("age>10").show(); 

      df.filter(df("age")>10).show(); 

      df.where(df("age")>10).show(); 都可以

14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型

      df.groupBy("age").agg(Map("age"->"count")).show();

      df.groupBy("age").avg().show();都可以

这里如果要把groupBy之后的结果转换成一个Dataframe需要另一个函数转换一下,比如 count

15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素

16、 join(right: DataFrame, joinExprs: Column, joinType: String)

一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi

df.join(ds,df("name")===ds("name")

and  df("age")===ds("age"),"outer").show();

17、 limit(n: Int) 返回dataframe类型 去n 条数据出来

18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤df.na.drop().show(); 删除为空的行

19、 orderBy(sortExprs: Column*) 做alise排序,还可以指定进行降序排序desc

20、 select(cols:string*) dataframe 做字段的刷选

     df.select($"colA", $"colB" + 1)

这里面select有两种类型的参数,一种是上面的string类型,就是前面没有$符号,如果加了$标识这是一个column类型。使用这种类型需要加import sqlContext.implicits._ (这些是从身边spark大神xuehao同学那里学到的)这些细节真的从实践中来,所以大家赶紧收藏!

21、selectExpr(exprs: String*) 做字段的刷选

df.selectExpr("name","name as names","upper(name)","age+1").show();

22、 sort(sortExprs: Column*) 排序

    df.sort(df("age").desc).show(); 默认是asc;这里的写法可以有很多几种。

23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();

24、 withColumnRenamed(existingName: String, newName: String)

修改列表 df.withColumnRenamed("name","names").show();

25、 withColumn(colName: String, col: Column) 增加一列

df.withColumn("aa",df("name")).show();

具体例子:

产看表格数据和表格视图

获取指定列并对齐进行操作

这里注意,这里的$”field”表示类型是column

根据条件进行过滤

首先是filter函数,这个跟RDD的是类同的,根据条件进行逐行过滤。现在的filter函数支持两种类型的参数,如下:一种是string类型,上图所示,运算符是在字符串里面的,还有一种是column类型也就是带$,注意运算符是在外面的。

另外一个where函数,类似,看图不赘述;

指定行或者多行进行排序排序

Sort和orderBY都可以达到排序的效果,可以指定根据一行或者多行进行排序,默认是升序,如果要使用降序进行排序,请使用column类型;

分组操作

分组聚合是在数据分析中最长用到的操作之一,比如上图所示,需要对某个字段进行分组求和、求平均、求最大最小等,可以直接使用groupBy函数,比SQL语句更类似于自然语言。这里还是那句话,得注意每个函数的返回类型。

Join操作

Join操作可以支持TDW sql涉及到的连接操作,格式也非常固定。

这里就先讲到这里,其实这里介绍的只是spark DataFrame最基础的一些函数,官方还提供了非常高级的API,比如bloomFilter、corr等等,同学们如果掌握了上面的内容,其他高级的可以查看官网提供的API介绍:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【产品新闻】2019可信云大会丨腾讯云AI平台专家产品经理余祖坤:AI建模平台的设计实践

    7月2日,2019可信云大会在北京国际会议中心隆重开幕。2019可信云大会以“智能云网边,可信创未来”为主题,由中国信息通信研究院主办。

    腾讯智能钛AI开发者
  • 【技术分享】机器学习之数据清洗与特征提取

    导语:本文详细的解释了机器学习中,经常会用到数据清洗与特征提取的方法PCA,从理论、数据、代码三个层次予以分析。

    腾讯智能钛AI开发者
  • 【技术沙龙】智能钛机器学习平台TI-ONE在工业和金融行业的落地

    凭借多年的技术积累和完善的产品矩阵,腾讯云AI在计算机视觉、自然语言处理和智能语音方面全面布局,持续推动人工智能技术在各产业领域的应用落地。

    腾讯智能钛AI开发者
  • 【技术分享】从Tensorflow源码中学习设计模式

    通常,我们在使用Tensorflow低级API编程时(非Eager模式), 一般有下面三个步骤:

    腾讯智能钛AI开发者
  • 【技术分享】机器学习之回归原理详述(一)

    导语:本文用了从数学层面和代码层面,再结合一些通俗易懂的例子,详细地描述了回归主要涉及的原理和知识,希望对于机器学习的初学者或者有兴趣研究模型具体实现的同学带来...

    腾讯智能钛AI开发者
  • 【技术分享】特征工程方法综述

    随着我们底层特征库中特征数目的不断增长,如何组合特征,如何针对不同场景选择适合的特征,如何评估特征优劣?这些问题已经日益凸显,所以这次想梳理现有的特征工程方法,...

    腾讯智能钛AI开发者
  • 【技术分享】 Youtube 短视频推荐系统变迁:从机器学习到深度学习

    Youtube是全球最大的视频分享平台,用户量高达10亿+,每天上传的UGC和PGC都是百万级别。那么问题就来了,他们是如何让用户在这么多的视频中快速的发现自己...

    腾讯智能钛AI开发者
  • 【产品动态】关于2019年8月19日智能钛机器学习平台系统升级通知

           您好!为了给您提供更好的服务,我们将于2019年8月19日升级智能钛机器学习平台系统,预计维护时间为19:00至22:00。期间平台暂时无法使用,...

    腾讯智能钛AI开发者
  • 【技术分享】机器学习知识体系

    导语:高中的时候,班主任让我们每学完一个章节,整理出这个章节的关键词和一份问题列表。现在回想起来,其实是很有用的,这让我们可以从另外一个视角来审视所学习的内容,...

    腾讯智能钛AI开发者

扫码关注云+社区

领取腾讯云代金券