首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
Hadoop面试复习系列——HDFS(一)
2
大数据技术之_04_Hadoop学习_01_HDFS_HDFS概述+HDFS的Shell操作(开发重点)+HDFS客户端操作(开发重点)+HDFS的数据流(面试重点)+NameNode和Seconda
3
大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)
4
大数据技术之_05_Hadoop学习_01_MapReduce_MapReduce概述+Hadoop序列化
5
大数据技术之_03_Hadoop学习_01_入门_大数据概论+从Hadoop框架讨论大数据生态+Hadoop运行环境搭建(开发重点)
6
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客
7
大数据技术之_06_Zookeeper学习_Zookeeper入门+Zookeeper安装+Zookeeper内部原理+Zookeeper实战(开发重点)+企业面试真题
8
大数据技术之_09_Hive学习_复习与总结
9
大数据技术之_07_Hadoop学习_HDFS_HA(高可用)_HA概述+HDFS-HA工作机制+HDFS-HA集群配置+YARN-HA配置+HDFS Federation(联邦) 架构设计
10
大数据技术之_08_Hive学习_01_Hive入门+Hive安装、配置和使用+Hive数据类型
11
大数据技术之_08_Hive学习_04_压缩和存储(Hive高级)+ 企业级调优(Hive优化)
12
大数据技术之_08_Hive学习_05_Hive实战之谷粒影音(ETL+TopN)+常见错误及解决方案
13
大数据技术之_08_Hive学习_02_DDL数据定义(创建/查询/修改/删除数据库+创建表+分区表+修改表+删除表)+DML数据操作(数据导入+数据导出+清除表中数据)
14
大数据技术之_08_Hive学习_03_查询+函数
15
大数据技术之_16_Scala学习_09_函数式编程-高级
16
大数据技术之_09_Flume学习_Flume概述+Flume快速入门+Flume企业开发案例+Flume监控之Ganglia+Flume高级之自定义MySQLSource+Flume企业真实面试题(
17
大数据技术之_13_Azkaban学习_Azkaban(阿兹卡班)介绍 + Azkaban 安装部署 + Azkaban 实战
18
大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
19
大数据技术之_12_Sqoop学习_Sqoop 简介+Sqoop 原理+Sqoop 安装+Sqoop 的简单使用案例+Sqoop 一些常用命令及参数
20
大数据技术之_16_Scala学习_13_Scala语言的数据结构和算法_Scala学习之旅收官之作
21
大数据技术之_19_Spark学习_06_Spark 源码解析 + Spark 通信架构、脚本解析、standalone 模式启动、提交流程 + Spark Shuffle 过程 + Spark 内存
22
大数据技术之_16_Scala学习_04_函数式编程-基础+面向对象编程-基础
23
大数据技术之_14_Oozie学习
24
大数据技术之_26_交通状态预测项目_01
25
大数据技术之_16_Scala学习_02_变量
26
大数据技术之_16_Scala学习_07_数据结构(上)-集合
27
大数据技术之_28_电商推荐系统项目_01
28
大数据技术之_28_电商推荐系统项目_02
29
大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址
30
大数据技术之_16_Scala学习_01_Scala 语言概述
31
大数据技术之_29_MySQL 高級面试重点串讲_02
32
大数据技术之_18_大数据离线平台_05_离线平台项目模块小结
33
大数据技术之_19_Spark学习_06_Spark 源码解析小结
34
大数据技术之_16_Scala学习_05_面向对象编程-中级
35
大数据技术之_16_Scala学习_08_数据结构(下)-集合操作+模式匹配
36
大数据技术之_24_电影推荐系统项目_05_项目系统设计
37
大数据技术之_19_Spark学习_03_Spark SQL 应用解析小结
38
大数据技术之_19_Spark学习_07_Spark 性能调优小结
39
大数据技术之_19_Spark学习_05_Spark GraphX 应用解析小结
40
大数据技术之_19_Spark学习_02_Spark Core 应用解析小结
41
大数据技术之_24_电影推荐系统项目_08_项目总结及补充
42
大数据技术之_19_Spark学习_01_Spark 基础解析小结(无图片)
43
大数据技术之_18_大数据离线平台_03_数据处理+工具代码导入+业务 ETL 实现+创建数据库表
44
大数据技术之_24_电影推荐系统项目_02_Python 基础语法复习
45
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
46
大数据技术之_27_电商平台数据分析项目_01_大数据的框架回顾 + 大数据的企业应用
47
大数据技术之_23_Python核心基础学习_03_函数 + 对象(12.5小时)
48
大数据技术之_32_大数据面试题_01_Hive 基本面试 + Hive 数据分析面试 + Flume + Kafka 面试
49
大数据技术之_23_Python核心基础学习_04_ 异常 + 文件(3.5小时)
50
大数据技术之_16_Scala学习_03_运算符+程序流程控制
清单首页hadoop文章详情

大数据技术之_19_Spark学习_03_Spark SQL 应用解析小结

========== Spark SQL ========== 1、Spark SQL 是 Spark 的一个模块,可以和 RDD 进行混合编程、支持标准的数据源、可以集成和替代 Hive、可以提供 JDBC、ODBC 服务器功能。

2、Spark SQL 的特点:   (1)和 Spark Core 的无缝集成,可以在写整个 RDD 应用的时候,配合 Spark SQL 来实现逻辑。   (2)统一的数据访问方式,Spark SQL 提供标准化的 SQL 查询。   (3)Hive 的集成,Spark SQL 通过内嵌的 Hive 或者连接外部已经部署好的 Hive 实例,实现了对 Hive 语法的集成和操作。   (4)标准化的连接方式,Spark SQL 可以通过启动 thrift Server 来支持 JDBC、ODBC 的访问,即将自己作为一个 BI Server 来使用。

3、Spark SQL 可以执行 SQL 语句,也可以执行 HQL 语句,将运行的结果作为 Dataset 和 DataFrame(将查询出来的结果转换成 RDD,类似于 hive 将 sql 语句转换成 mapreduce)。

4、Spark SQL 的计算速度(Spark sql 比 Hive 快了至少一个数量级,尤其是在 Tungsten 成熟以后会更加无可匹敌),Spark SQL 推出的 DataFrame 可以让数据仓库直接使用机器学习、图计算等复杂的算法库来对数据仓库进行复杂深度数据价值的挖掘。

5、老版本中使用 hivecontext,现在使用 sparkSession。

========== Spark SQL 的数据抽象 ========== 0、RDD(Spark1.0)-> DataFrame(Spark1.3)-> DataSet(Spark1.6) 1、Spark SQL 提供了 DataFrame 和 DataSet 数据抽象。 2、DataFrame 就是 RDD + Schema,可以认为是一张二维表格。DataFrame 也是懒执行的、不可变的。DataFrame 性能上比 RDD 要高。 3、DataFrame 是一个弱类型的数据对象,DataFrame 的劣势是在编译期不进行表格中的字段的类型检查。在运行期进行检查。类似于 java.sql.ResultSet 类,只能通过 getString 这种方式来获取具体数据。 4、DataSet 是 Spark 最新的数据抽象,Spark 的发展会逐步将 DataSet 作为主要的数据抽象,弱化 RDD 和 DataFrame。DataSet 包含了 DataFrame 所有的优化机制。除此之外提供了以样例类为 Schema 模型的强类型。 5、type DataFrame = Dataset[Row] 6、DataFrame 和 DataSet 都有可控的内存管理机制,所有数据都保存在非堆内存上,节省了大量空间之外,还摆脱了GC的限制。都使用了 catalyst 进行 SQL 的优化。可以使得不太会使用 RDD 的工程师写出相对高效的代码。 7、RDD 和 DataFrame 和 DataSet 之间可以进行数据转换。

========== Spark SQL 的初探 -- 客户端查询 ========== 1、你可以通过 spark-shell 或者 spark-sql 来操作 Spark SQL,注意:spark 作为 SparkSession 的变量名,sc 作为 SparkContext 的变量名。 2、你可以通过 Spark 提供的方法读取 JSON 文件,将 JSON 文件转换成 DataFrame。 3、你可以通过 DataFrame 提供的 API 来操作 DataFrame 里面的数据。 4、你可以通过将 DataFrame 注册成为一个临时表的方式,来通过 Spark.sql 方法运行标准的 SQL 语句来查询。

小细节:   show() --> 表格   collect() --> RDD 打印

========== IDEA 创建 Spark SQL 程序 ========== 1、Spark SQL 读取 json 需要 json 文件中一行是一个 json 对象。 2、通过创建 SparkSession 来使用 SparkSQL: 示例代码如下:

代码语言:javascript
复制
package com.atguigu.sparksql

import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object HelloWorld {

  val logger = LoggerFactory.getLogger(HelloWorld.getClass)

  def main(args: Array[String]) {
    // 创建 SparkSession 并设置 App 名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    // 通过隐式转换将 RDD 操作添加到 DataFrame 上(将 RDD 转成 DataFrame)
    import spark.implicits._

    // 通过 spark.read 操作读取 JSON 数据
    val df = spark.read.json("examples/src/main/resources/people.json")

    // show 操作类似于 Action,将 DataFrame 直接打印到 Console 上
    df.show()

    // DSL 风格的使用方式:属性的获取方法 $
    df.filter($"age" > 21).show()

    //将 DataFrame 注册为表
    df.createOrReplaceTempView("persons")

    // 执行 Spark SQL 查询操作
    spark.sql("select * from perosns where age > 21").show()

    // 关闭资源
    spark.stop()
  }
}

========== DataFrame 查询方式 ========== 1、DataFrame 支持两种查询方式:一种是 DSL 风格,另外一种是 SQL 风格。 DSL 风格:   (1)你需要引入 import spark.implicit._ 这个隐式转换,可以将 DataFrame 隐式转换成 RDD。 示例:   df.select("name").show()   df.filter($"age" > 25).show()

SQL 风格:   (1)你需要将 DataFrame 注册成一张表格,如果你通过 createOrReplaceTempView 这种方式来创建,那么该表当前 Session 有效,如果你通过 createGlobalTempView 来创建,那么该表跨 Session 有效,但是 SQL 语句访问该表的时候需要加上前缀 global_temp.xxx。   (2)你需要通过 sparkSession.sql 方法来运行你的 SQL 语句。 示例:   一个 SparkContext 可以多次创建 SparkSession。   // Session 内可访问,一个 SparkSession 结束后,表自动删除。   df.createOrReplaceTempView("persons") // 使用表名不需要任何前缀   // 应用级别内可访问,一个 SparkContext 结束后,表自动删除。   df.createGlobalTempView("persons") // 使用表名需要加上“global_temp.” 前缀,比如:global_temp.persons

========== DataSet 创建方式 ========== 1、定义一个 DataSet,首先你需要先定义一个 case 类。

========== RDD、DataFrame、DataSet 之间的转换总结 ========== 1、RDD -> DataFrame : rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元组 -> toDF()(注意:这是第一种方式) 2、DataFrame -> RDD : df.rdd 注意输出类型:res2: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])

1、 RDD -> DataSet : rdd.map(para => Person(para(0).trim(), para(1).trim().toInt)).toDS() // 需要先定义样例类 -> toDS() 2、 DataSet -> RDD : ds.rdd 注意输出类型:res5: Array[Person] = Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))

1、 DataFrame -> DataSet : df.as[Person] // 传入类型 2、 DataSet -> DataFrame : ds.toDF()

========== DataFrame 的 Schema 的获取方式 ========== RDD -> DataFram 的三种方式:

代码语言:javascript
复制
// 将没有包含 case 类的 RDD 转换成 DataFrame
rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元组 -> toDF()(注意:这是第一种方式)

// 将包含有 case 类的 RDD 转换成 DataFrame,注意:需要我们先定义 case 类
// 通过反射的方式来设置 Schema 信息,适合于编译期能确定列的情况
rdd.map(attributes => Person(attributes(0), attributes(1).trim().toInt)).toDF() // 样例类-> RDD -> toDF()(注意:这是第二种方式)

// 通过编程的方式来设置 Schema 信息,适合于编译期不能确定列的情况(注意:这是第三种方式)
val schemaString = "name age" // 实际开发中 schemaString 是动态生成的
val fields = schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rdd[Row] = rdd.map(attributes => Row(attributes(0.trim), attributes(1).trim))
val peopeDF = spark.createDataFrame(rdd[Row], schema)

========== 对于 DataFrame Row 对象的访问方式 ========== 1、由 DataFrame = Dataset[Row] 可知, DataFrame 里面每一行都是 Row 对象。 2、如果需要访问 Row 对象中的每一个元素,可以通过索引 row(0);也可以通过列名 row.getAsString 或者索引 row.getAsInt

========== 应用 UDF 函数(用户自定义函数) ========== 1、通过 spark.udf.register(funcName, func) 来注册一个 UDF 函数,name 是 UDF 调用时的标识符,即函数名,fun 是一个函数,用于处理字段。 2、你需要将一个 DF 或者 DS 注册为一个临时表。 3、通过 spark.sql 去运行一个 SQL 语句,在 SQL 语句中可以通过 funcName(列名) 方式来应用 UDF 函数。 示例代码如下:

代码语言:javascript
复制
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()

scala> spark.udf.register("addName", (x: String) => "Name:" + x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select addName(name), age from people").show()

scala> spark.sql("select addName(name) as newName, age from people").show()

========== 应用 UDAF 函数(用户自定义聚合函数) ========== 1、弱类型用户自定义聚合函数 步骤如下: (1)新建一个 Class 继承UserDefinedAggregateFunction,然后复写方法:

代码语言:javascript
复制
    // 聚合函数需要输入参数的数据类型
    override def inputSchema: StructType = ???

    // 聚合缓冲区中值的数据类型
    override def bufferSchema: StructType = ???

    // 返回值的数据类型
    override def dataType: DataType = ???

    // 对于相同的输入一直有相同的输出
    override def deterministic: Boolean = true

    // 用于初始化你的数据结构
    override def initialize(buffer: MutableAggregationBuffer): Unit = ???

    // 相同 Execute 间的数据合并(同一分区)
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???

    // 不同 Execute 间的数据合并(不同分区)
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???

    // 计算最终结果
    override def evaluate(buffer: Row): Any = ???

(2)你需要通过 spark.udf.resigter 去注册你的 UDAF 函数。 (3)需要通过 spark.sql 去运行你的 SQL 语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。

2、强类型的用户自定义聚合函数 步骤如下: (1)新建一个class,继承Aggregator[Employee, Average, Double] 其中 Employee 是在应用聚合函数的时候传入的对象,Average 是聚合函数在运行的时候内部需要的数据结构,Double 是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。 复写相对应的方法:

代码语言:javascript
复制
    // 用于定义一个聚合函数内部需要的数据结构
    override def zero: Average = ???

    // 针对每个分区内部每一个输入来更新你的数据结构
    override def reduce(b: Average, a: Employee): Average = ???

    // 用于对于不同分区的结构进行聚合
    override def merge(b1: Average, b2: Average): Average = ???

    // 计算输出
    override def finish(reduction: Average): Double = ???

    // 设定之间值类型的编码器,要转换成 case 类
    // Encoders.product 是进行 scala 元组和 case 类转换的编码器
    override def bufferEncoder: Encoder[Average] = ???

    // 设定最终输出值的编码器
    override def outputEncoder: Encoder[Double] = ???

2、新建一个 UDAF 实例,通过 DF 或者 DS 的 DSL 风格语法去应用。

========== Spark SQL 的输入和输出 ========== 1、对于 Spark SQL 的输入需要使用 sparkSession.read 方法

代码语言:javascript
复制
(1)通用模式 sparkSession.read.format("json").load("path")     支持的类型有:parquet、json、text、csv、orc、jdbc、......
(2)专业模式 sparkSession.read.json("path") 或 csv 或 ...      即直接指定类型

2、对于 Spark SQL 的输出需要使用 sparkSession.write 方法

代码语言:javascript
复制
(1)通用模式 dataFrame.write.format("json").save("path")       支持的类型有:parquet、json、text、csv、orc、jdbc、......
(2)专业模式 dataFrame.write.csv("path") 或 json 或 ...      即直接指定类型

3、如果使用通用模式,则 spark 默认的 parquet 是默认格式,那么 sparkSession.read.load 它加载的默认是 parquet 格式;dataFrame.write.save 也是默认保存成 parquet 格式。 4、注意:如果需要保存成一个 text 文件,那么需要 dataFrame 里面只有一列数据。

========== Spark SQL 与 Hive 的集成 ========== 内置 Hive 1、Spark 内置有 Hive,Spark 2.1.1 内置的 Hive 是 1.2.1。 2、如果要使用内嵌的 Hive,什么都不用做,直接用就可以了。但是呢,此时的我们只能创建表,如果查询表的话会报错,原因是:本地有 spark-warehouse 目录,而其他机器节点没有 spark-warehouse 目录。解决办法如下: 3、需要将 core-site.xml 和 hdfs-site.xml 拷贝到 spark 的 conf 目录下,然后分发至其他机器节点。如果 spark 路径下发现有 metastore_db 和 spark-warehouse,删除掉。然后重启集群。 4、在第一次启动创建 metastore 的时候,需要指定 spark.sql.warehouse.dir 这个参数, 比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://hadoop102:9000/spark_warehouse 5、注意:如果在 load 数据的时候,需要先将数据放到 HDFS 上。

外部 Hive 1、需要将 hive-site.xml 拷贝到 spark 的 conf 目录下,然后分发至其他机器节点。 2、如果 hive 的 metestore 使用的是 mysql 数据库,那么需要将 mysql 的 jdbc 驱动包放到 spark 的 jars 目录下。 3、可以通过 spark-sql 或者 spark-shell 来进行 sql 的查询,完成和 hive 的连接。

hive、spark、hdfs 关系:   spark 文件中有两个文件夹:spark-warehouse、metastore_db,当我们拷贝 hive-site.xml 文件到 spark 的 conf 目录后,会读取 Hive 中的 warehouse 文件,获取到 hive 中的表格数据。

下一篇
举报
领券