前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >慕课网Spark SQL日志分析 - 5.DateFrame&Dataset

慕课网Spark SQL日志分析 - 5.DateFrame&Dataset

作者头像
Meet相识
发布2018-09-12 17:02:26
6430
发布2018-09-12 17:02:26
举报
文章被收录于专栏:技术专栏技术专栏

5.DateFrame&Dataset

1.DateFrame产生背景

DataFrame 不是Spark Sql提出的。而是在早起的Python、R、Pandas语言中就早就有了的。

Spark诞生之初一个目标就是给大数据生态圈提供一个基于通用语言的,简单易用的API。

1.如果想使用SparkRDD进行编程,必须先学习Java,Scala,Python,成本较高 2.R语言等的DataFrame只支持单机的处理,随着Spark的不断壮大,需要拥有更广泛的受众群体利用Spark进行分布式的处理。

2.DataFrame概述

A Dataset is a distributed collection of data. - 分布式的数据集 A DataFrame is a Dataset organized into named columns.(RDD with Schema) - 以列(列名、列的类型、列值)的形式构成的分布式数据集,依据列赋予不同的名称

It is conceptually equivalent to a table in a relational database or a data frame in R/Python.but with richer optimizations under the hood.

image.png

3.DataFrame和RDD的对比

RDD:分布式的可以进行并行处理的集合 java/scala ==> JVM python ==> python runtime

DataFrame:也是一个分布式的数据集,他更像一个传统的数据库的表,他除了数据之外,还能知道列名,列的值,列的属性。他还能支持一下复杂的数据结构。 java/scala/python ==> logic plan

从易用的角度来看,DataFrame的学习成本更低。由于R语言,Python都有DataFrame,所以开发起来很方便

image.png

4.DataFrame基本API操作

image.png

看下load方法的源码

代码语言:javascript
复制
/**
* Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
* @since 1.4.0
*/
// 返回的就是一个DataFrame
def load(path: String): DataFrame = {
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
}
代码语言:javascript
复制
package com.gwf.spark

import org.apache.spark.sql.SparkSession

object DataFrameApp {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
.appName("DataFrameApp").master("local[2]").getOrCreate()

// 将json文件加载成一个dataframe
val peopleDF = spark.read.format("json").load("file:///Users/gaowenfeng/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")

// 输出dataframe对应的schema信息
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 输出数据集的前20条记录
peopleDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

// 查询某列的所有数据 select name from table
peopleDF.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+

// 查询某几列所有的数据,并对列进行计算 select name, age+10 as age2 from table
peopleDF.select(peopleDF.col("name"),(peopleDF.col("age")+10).as("age2")).show()
// +-------+----+
// | name|age2|
// +-------+----+
// |Michael|null|
// | Andy| 40|
// | Justin| 29|
// +-------+----+

// 根据每一列的值进行过滤 select * from table where age > 19
peopleDF.filter(peopleDF.col("age")>19).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 根据每一列的值进行分组,然后聚合 select age,count(1) from table group by age
peopleDF.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+

spark.stop()
}

}

5.DataFrame与RDD交互操作方式

image.png

1.通过反射的方式

前提:实现需要你知道你的字段,类型

代码语言:javascript
复制
package com.gwf.spark

import org.apache.spark.sql.SparkSession

/**
* DataFrameRDD的互操作
*/
object DataFrameRDDAPP {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("DataFrameRDDAPP").master("local[2]").getOrCreate()

val rdd = spark.sparkContext.textFile("file:///Users/gaowenfeng/project/idea/MySparkSqlProject/src/main/resources/infos.txt")

// 需要导入隐式转换
import spark.implicits._
val infoDf = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

infoDf.printSchema()

infoDf.filter(infoDf.col("age") > 30).show()

// Creates a local temporary view using the given name. The lifetime of this
// temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
infoDf.createOrReplaceTempView("infos")

spark.sql("select * from infos where age > 30").show()
}

case class Info(id: Int, name: String, age: Int)

}
2.编程方式

如果第一种不能满足你的要求(事先不知道)

代码语言:javascript
复制
val rdd = spark.sparkContext.textFile("file:///Users/gaowenfeng/project/idea/MySparkSqlProject/src/main/resources/infos.txt")

// 1.Create an RDD of Rows from the original RDD;
val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))

// 2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
val structType = StructType(Array(
StructField("id",IntegerType, true),
StructField("name",StringType, true),
StructField("age",IntegerType, true)))

// 3.Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
val infoDF = spark.createDataFrame(infoRDD, structType)

infoDF.printSchema()
3.选型,优先考虑第一种

6.DataSet 概述与使用

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

image.png

代码语言:javascript
复制
SQL:
seletf name from table compile ok result no
DF:
df.seletc("name") compile no
df.select("naem") compile ok result no
DS:
ds.select("naem") compile no

DataFrame = DataSet[Row] DataSet 强类型 typed case class DataFrame 弱类型

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.07.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 5.DateFrame&Dataset
    • 1.DateFrame产生背景
      • 2.DataFrame概述
        • 3.DataFrame和RDD的对比
          • 4.DataFrame基本API操作
            • 5.DataFrame与RDD交互操作方式
              • 6.DataSet 概述与使用
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档