前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【技术分享】Spark DataFrame入门手册

【技术分享】Spark DataFrame入门手册

原创
作者头像
腾讯云TI平台
发布2019-08-06 10:41:23
4.7K0
发布2019-08-06 10:41:23
举报
文章被收录于专栏:腾讯云TI平台腾讯云TI平台

本文原作者:赖博先,经授权后发布。

一、简介

Spark SQL是spark主要组成模块之一,其主要作用与结构化数据,与hadoop生态中的hive是对标的。而DataFrame是spark SQL的一种编程抽象,提供更加便捷同时类同与SQL查询语句的API,让熟悉hive的数据分析工程师能够非常快速上手。

    DataFrame是一种以命名列的方式组织的分布式数据集,可以类比于hive中的表。但是比hive表更加灵活的是,你可以使用各种数据源来构建一个DataFrame,如:结构化数据文件(例如json数据)、hive表格、外部数据库,还可以直接从已有的RDD变换得来。后面会把相关方法、接口跟大家一一道来。

二、初步使用

大家学习一门语言可能都是从“hello word!”开始的,这主要目的是让学习者熟悉程序运行的环境,同时亲身感受程序运行过程。这里我们也会从环境到运行的步骤进行讲解。

导入spark运行环境相关的类

所有spark相关的操作都是以sparkContext类作为入口,而Spark SQL相关的所有功能都是以SQLContext类作为入口。下面的语句是新建入口类的对象。最下面的语句是引入隐式转换,隐式的将RDD转换为DataFrame。

下面就是从tdw表中读取对应的表格数据,然后就可以使用DataFrame的API来操作数据表格,其中TDWSQLProvider是数平提供的spark tookit,可以在KM上找到这些API的详细用法。

这段代码的意思是从tdw 表中读取对应分区的数据,select出表格中对应的字段(这里面的字段名字就是表格字段名字,需要用双引号)toDF将筛选出来的字段转换成DataFrame,在进行groupBy操作,这里的groupBy操作跟TDW hive操作是一样的意思,对指定字段进行分组操作,count函数用来计数计数,这里得到的DataFrame最后有一个”count”命名的字段保存每个分组的个数(这里特别需要注意函数的返回类型,groupby函数返回的并不是dataframe类型的数据,后面会提到)。接下来的printSchema函数是打印出edge的视图,可以理解成tdw idea里面的show DDL,Show函数是打印出这个DataFrame前20行数据(默认),当然可以指定行数打印。

从上面的例子中可以看出,DataFrame基本把SQL函数给实现了,在hive中用到的很多操作(如:select、groupBy、count、join等等)可以使用同样的编程习惯写出spark程序,这对于没有函数式编程经验的同学来说绝对福利。

三、函数说明及其用法

函数式编程是spark编程的最大特点,而函数则是函数式编程的最小操作单元,这边主要列举DataFrame常用函数以及主要用法:

Action 操作

特别注意每个函数的返回类型

1、 collect() ,返回值是一个数组,返回dataframe集合所有的行

2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行

3、 count() 返回一个number类型的,返回dataframe集合的行数

4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe("age", "height").show()

5、 first() 返回第一行 ,类型是row类型

6、 head() 返回第一行 ,类型是row类型

7、 head(n:Int)返回n行 ,类型是row 类型

8、 show()返回dataframe集合的值 默认是20行,返回类型是unit

9、 show(n:Int)返回n行,,返回值类型是unit

10、 table(n:Int) 返回n行 ,类型是row 类型

DataFrame的基本操作

1、 cache()同步数据的内存

2、 columns 返回一个string类型的数组,返回值是所有列的名字

3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型

4、 explan()打印执行计划

5、 explain(n:Boolean) 输入值为 false 或者true ,返回值是unit  默认是false ,如果输入true 将会打印 逻辑的和物理的

6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false

7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型

8、 printSchema() 打印出字段名称和类型 按照树状结构来打印

9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了

10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回

11、 toDF()返回一个新的dataframe类型的

12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,

13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据

14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD

聚合函数:

1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值

    df.agg(max("age"), avg("salary"))

    df.groupBy().agg(max("age"), avg("salary"))

2、 agg(exprs: Map[String, String])  返回dataframe类型 ,同数学计算求值 map类型的

    df.agg(Map("age" -> "max", "salary" -> "avg"))

    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)  返回dataframe类型 ,同数学计算求值

    df.agg(Map("age" -> "max", "salary" -> "avg"))

    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

4、 apply(colName: String) 返回column类型,捕获输入进去列的对象

5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名

6、 col(colName: String)  返回column类型,捕获输入进去列的对象

7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总

8、 distinct 去重 返回一个dataframe类型

9、 drop(col: Column) 删除某列 返回dataframe类型

10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe

11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的;这个操作非常有用呀

12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分

df.explode("name","names") {name :String=> name.split(" ")}.show();

将name字段根据空格来拆分,拆分的字段放在names里面

13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型

      df.filter("age>10").show(); 

      df.filter(df("age")>10).show(); 

      df.where(df("age")>10).show(); 都可以

14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型

      df.groupBy("age").agg(Map("age"->"count")).show();

      df.groupBy("age").avg().show();都可以

这里如果要把groupBy之后的结果转换成一个Dataframe需要另一个函数转换一下,比如 count

15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素

16、 join(right: DataFrame, joinExprs: Column, joinType: String)

一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi

df.join(ds,df("name")===ds("name")

and  df("age")===ds("age"),"outer").show();

17、 limit(n: Int) 返回dataframe类型 去n 条数据出来

18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤df.na.drop().show(); 删除为空的行

19、 orderBy(sortExprs: Column*) 做alise排序,还可以指定进行降序排序desc

20、 select(cols:string*) dataframe 做字段的刷选

     df.select($"colA", $"colB" + 1)

这里面select有两种类型的参数,一种是上面的string类型,就是前面没有$符号,如果加了$标识这是一个column类型。使用这种类型需要加import sqlContext.implicits._ (这些是从身边spark大神xuehao同学那里学到的)这些细节真的从实践中来,所以大家赶紧收藏!

21、selectExpr(exprs: String*) 做字段的刷选

df.selectExpr("name","name as names","upper(name)","age+1").show();

22、 sort(sortExprs: Column*) 排序

    df.sort(df("age").desc).show(); 默认是asc;这里的写法可以有很多几种。

23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();

24、 withColumnRenamed(existingName: String, newName: String)

修改列表 df.withColumnRenamed("name","names").show();

25、 withColumn(colName: String, col: Column) 增加一列

df.withColumn("aa",df("name")).show();

具体例子:

产看表格数据和表格视图

获取指定列并对齐进行操作

这里注意,这里的$”field”表示类型是column

根据条件进行过滤

首先是filter函数,这个跟RDD的是类同的,根据条件进行逐行过滤。现在的filter函数支持两种类型的参数,如下:一种是string类型,上图所示,运算符是在字符串里面的,还有一种是column类型也就是带$,注意运算符是在外面的。

另外一个where函数,类似,看图不赘述;

指定行或者多行进行排序排序

Sort和orderBY都可以达到排序的效果,可以指定根据一行或者多行进行排序,默认是升序,如果要使用降序进行排序,请使用column类型;

分组操作

分组聚合是在数据分析中最长用到的操作之一,比如上图所示,需要对某个字段进行分组求和、求平均、求最大最小等,可以直接使用groupBy函数,比SQL语句更类似于自然语言。这里还是那句话,得注意每个函数的返回类型。

Join操作

Join操作可以支持TDW sql涉及到的连接操作,格式也非常固定。

这里就先讲到这里,其实这里介绍的只是spark DataFrame最基础的一些函数,官方还提供了非常高级的API,比如bloomFilter、corr等等,同学们如果掌握了上面的内容,其他高级的可以查看官网提供的API介绍:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、简介
  • 二、初步使用
  • 三、函数说明及其用法
    • Action 操作
      • DataFrame的基本操作
        • 聚合函数:
          • 具体例子:
          相关产品与服务
          腾讯云 TI 平台
          腾讯云 TI 平台(TencentCloud TI Platform)是基于腾讯先进 AI 能力和多年技术经验,面向开发者、政企提供的全栈式人工智能开发服务平台,致力于打通包含从数据获取、数据处理、算法构建、模型训练、模型评估、模型部署、到 AI 应用开发的产业 + AI 落地全流程链路,帮助用户快速创建和部署 AI 应用,管理全周期 AI 解决方案,从而助力政企单位加速数字化转型并促进 AI 行业生态共建。腾讯云 TI 平台系列产品支持公有云访问、私有化部署以及专属云部署。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档