教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/175
声明:版权所有,转载请联系平台与作者并注明出处
在高版本的Spark中,我们可以使用Dataframe这个结构形态更方便快捷地对数据进行处理,而且它也和我们熟悉的python pandas Dataframe的很多操作可以类比关联。
DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者1个Python(或者R)中的data frame一样,但是进行了一些优化。DataFrame可以根据结构化的数据文件、hive表、外部数据库或者已经存在的RDD构造。
根据官方文档的解释,我们可以发现 Spark DataFrame 有以下几个核心点:
DataFrame API 是在 R 和 Python Pandas Dataframe 灵感之上设计的,具有以下功能特性:
简单来说,DataFrame 能够更方便的操作数据集,而且因为其底层是通过 Spark SQL 的 Catalyst优化器生成优化后的执行代码,所以其执行速度会更快。
Spark SQL,DataFrame,datasets 共用 Spark SQL 库,三者共享同样的代码优化、生成以及执行流程,所以 SQL,DataFrame,datasets 的入口都是 SQLContext。
以python代码(pyspark)为例,我们在创建spark Dataframe之前,需要先初试化Sparksession。
基于sparksession对象我们可以通过read函数对不同类型的文本形态数据进行加载(比如下图演示的是json格式)
当然,我们也可以通过RDD初始化spark Dataframe,参考代码如下图所示:
我们也可以直接从csv文件加载数据,如下图参考代码所示:
构建完成的spark Dataframe可以通过printSchema查看Dataframe的结构形态,如下参考代码所示:
DataFrame的操作API汇总如下图所示:
可以通过agg操作对spark Dataframe的数据进行聚合统计。
Alias操作主要是对spark Dataframe的字段进行重命名操作。
cache用于对数据持久化,对应操作下的数据,不会在spark计算过程中反复计算。
collect操作会把数据直接把数据取回内存,以python列表形态返回。
可以通过columns操作获取字段名称列表。
对于数据的统计计算,比如相关性可以通过corr操作完成。
可以通过count操作完成Dataframe数据的计数统计。
我们通过describe函数可以查看Dataframe数据的基本统计信息。
如果要对Dataframe数据进行虑重操作,可以使用distinct算子操作。
删除数据或者字段都可以通过drop算子完成。
dropna可以帮助我们剔除掉数据中的缺失值记录或者字段。
我们可以通过fillna来填充Dataframe缺失值。
我们可以通过filter操作对spark Dataframe的数据进行条件过滤。
first可以取出spark Dataframe的第1条数据记录并返回。
Spark Dataframe中的flatmap和RDD中的操作类似,也可以帮助我们把数据变换并平铺返回。
可以通过head操作返回前n条数据记录。
对于Spark Dataframe大数据的分组可以通过groupby完成
我们通过Join操作对Spark Dataframe的不同数据表进行连接聚合。
可以通过orderby对spark Dataframe数据进行排序操作。
除了使用DataFrame API数据,还可以注册成table,通过SQL对数据进行操作。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。