Zzreal的大数据笔记-SparkDay04

Spark SQL

SparkSQL的前身是Shark,它抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。

1、Spark SQL性能

Spark SQL比hive快10-100倍,原因:

内存列存储( In- Memory Columnar Storage )

基于Row的Java Object存储:内存开销大,且容易FULL GC,按列查询比较慢。

基于Column的ByteBuf f er存储( Spark SQL ) :内存开销小,按列查询速度较快。

SparkSQL内存列式储方式无论在空间占用量和读取吞吐率上都占有很大优势。

对于原生态的JVM对象存储方式,每个对象通常要增加12-16字节的额外开销,对于一个270MB的TPC-H lineitem table数据,使用这种方式读入内存,要使用970MB左右的内存空间(通常是2~5倍于原生数据空间);另外,使用这种方式,每个数据记录产生一个JVM对象,如果是大小为200B的数据记录,32G的堆栈将产生1.6亿个对象,这么多的对象,对于GC来说,可能要消耗几分钟的时间来处理(JVM的垃圾收集时间与堆栈中的对象数量呈线性相关)。显然这种内存存储方式对于基于内存计算的Spark来说,很昂贵也负担不起。

对于内存列存储来说,将所有原生数据类型的列采用原生数组来存储,将Hive支持的复杂数据类型(如array、map等)先序化后并接成一个字节数组来存储。这样,每个列创建一个JVM对象,从而导致可以快速的GC和紧凑的数据存储;额外的,还可以使用低廉CPU开销的高效压缩方法(如字典编码、行长度编码等压缩方法)降低内存开销;更有趣的是,对于分析查询中频繁使用的聚合特定列,性能会得到很大的提高,原因就是这些列的数据放在一起,更容易读入内存进行计算。

字节码生成技术( bytecode generation ,即 CG )

Scala 代码优化

SparkSQL在使用Scala编写代码的时候,尽量避免低效的、容易GC的代码;尽管增加了编写代码的难度,但对于用户来说,还是使用统一的接口,没受到使用上的困难。

2、Spark SQL运行架构

类似于关系型数据库,SparkSQL也是语句也是由Projection(a1,a2,a3)、DataSource(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Result-->Data Source-->Operation的次序来描述的。

数据库系统先将读入的SQL语句(Query)先进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等等。这一步就可以判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind),

将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定,如果相关的Projection、Data Source等等都是存在的话,就表示这个SQL语句是可以执行的;

而在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize),

最终执行该计划(Execute),并返回结果。当然在实际的执行过程中,是按Operation-->Data Source-->Result的次序来进行的,和SQL语句的次序刚好相反;在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。

以上过程看上去非常简单,但实际上会包含很多复杂的操作细节在里面。而这些操作细节都和Tree有关,在数据库解析(Parse)SQL语句的时候,会将SQL语句转换成一个树型结构来进行处理,如下面一个查询,会形成一个含有多个节点(TreeNode)的Tree,然后在后续的处理过程中对该Tree进行一系列的操作。

3、Spark SQL的代码实现---需要一个DataFream

DataFream是以指定列组织的分布式数据集合,相当于关系数据库中的一个表。

DF和RDD的区别:DF是一种以RDD为基础的分布式数据集,带有Schema元信息,每一列都在有名称和类型,如下图所示。

RDD=>DF,需要导入SqlContext的隐式转换(2.x版本可以直接导SparkSession的),然后直接可以rdd.toDF()

Spark SQL连接JDBC

val properties=new Properties()

properties.setProperty("driver","com.mysql.jdbc.Driver")

properties.setProperty("user","root")

properties.setProperty("password","1327")

//写

//df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/dbName?useUnicode=true&characterEncoding=UTF-8","tableName",properties)

//读

(sqlContext).read.jdbc("jdbc:mysql://localhost:3306/dbName?useUnicode=true&characterEncoding=UTF-8","tableName",properties).show()

JDBC连接池

对于一个在线的spark streaming系统,DStream是源源不断的,当需要查Mysql数据库时,如果我们基于每个RDD,或基于分区建立mysql连接,那么需要经常建立、关闭数据库连接。所以需要在启动application时,在executor上先建立一个mysql连接池,然后该executor上的所有task都直接使用连接池中的连接访问数据库。如下:

import java.sql.

import scala.io.Source

object DbUtils{

// bin/flume-ng agent --conf conf --conf-file conf/flume+sparkStreaming.conf --name a1 -Dflume.root.logger=INFO,console

val prop = new java.util.Properties

prop.load(Source.fromFile("config.properties").bufferedReader())

private[this] var pool = new mutable.Stack[Connection]

private[this] val connectionCounts = 10

private[this] val user = prop.getProperty("jdbc.user")

private[this] val pwd = prop.getProperty("jdbc.password")

private[this] val dataBaseUrl = prop.getProperty("jdbc.url")

for (i

val conn = DriverManager.getConnection(dataBaseUrl, user, pwd)

pool.push(conn)

}

private[this] def getConnection: Connection = {

while (pool.length == 0) {

Thread.sleep(10)

}

pool.pop()

}

def execUpdate(sql: String, params: Array[Any]): Int = {

var result: Int = 0

val conn = getConnection

conn.setAutoCommit(false)

val stm = conn.prepareStatement(sql)

if (params != null && params.length > 0) {

for (i

// println(i)

stm.setObject(i+1, params(i))

}

}

try {

result = stm.executeUpdate()

conn.commit()

} catch {

case ex: Exception => println(ex.getStackTrace)

} finally {

if (conn != null) {

pool.push(conn)

}

}

result

}

}

以上内容均为作者个人笔记,如有错误欢迎指正...

关注CSDN博客 Zonzereal,更多大数据笔记等你...

本文来自企鹅号 - 全球大搜罗媒体

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏积累沉淀

Shuffle过程详解

Shuffle过程是MapReduce的核心,最近看了很多资料,网上说法大体相同,但有些地方有一点点出入,就是各个阶段的执行顺序 总个shuffle过程可以看做...

26690
来自专栏Java面试通关手册

深入理解单例模式

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/Java_G...

21460
来自专栏学习力

《Java从入门到放弃》JavaSE入门篇:JDBC(入门版)

20790
来自专栏DOTNET

Entity Framework——并发策略

使用EF框架遇到并发时,一般采取乐观并发控制。 1支持并发检验 为支持并发检验,需要对实体进行额外的设置。默认情况下是不支持并发检验的。有以下两种方式: ...

46080
来自专栏分布式系统进阶

Influxdb Cluster下的数据写入

3.2 调用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在创建,创建过程...

28020
来自专栏Golang语言社区

Git 项目推荐 | Go 语言读写 INI 文件工具包

原文 http://git.oschina.net/Unknown/ini 主题 Git Go语言 本包提供了 Go 语言中读写 INI 文件的功能。 功能特...

1.8K120
来自专栏代码世界

Django中ORM介绍和字段及其参数

ORM介绍 ORM概念   对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的...

47080
来自专栏Java架构沉思录

分布式ID常见解决方案

在分布式系统中,往往需要对大量的数据如订单、账户进行标识,以一个有意义的有序的序列号来作为全局唯一的ID。

73520
来自专栏数据库

高级盲注—floor,rand,group by报错注入

大家好,我是你们的老朋友Alex。最近一直在学习SQL注入,发现了很多很多有趣的东西。我就分享我的一篇有关floor,rand,group by报错注入的笔记吧...

32190
来自专栏Java3y

权限管理系统

前言 前面我们做的小项目都是一个表的,业务代码也相对简单。现在我们来做一个权限管理系统,体验一下多表的业务逻辑,顺便巩固一下过滤器的知识。! ---- 目的 现...

34060

扫码关注云+社区

领取腾讯云代金券