一、SparkSql介绍
1.简介 Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。 为什么要学习Spark SQL? 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。 2.特点 *容易整合 *统一的数据访问方式 *兼容Hive *标准的数据连接 3.基本概念 *DataFrame DataFrame(表) = schema(表结构) + Data(表结构,RDD) 就是一个表 是SparkSql 对结构化数据的抽象 DataFrame表现形式就是RDD DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建, DataFrame多了数据的结构信息,即schema。 RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。 DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化 *Datasets Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。 4.创建表 DataFrame 方式一 使用case class 定义表 val df = studentRDD.toDF 方式二 使用SparkSession直接生成表 val df = session.createDataFrame(RowRDD,scheme) 方式三 直接读取一个带格式的文件(json文件) spark.read.json("") 5.视图(虚表) 普通视图 df.createOrReplaceTempView("emp") 只对当前对话有作用 全局视图 df.createGlobalTempView("empG") 在全局(不同会话)有效 前缀:global_temp 6.操作表: 两种语言:SQL,DSL spark.sql("select * from t ").show df.select("name").show
二、load和save 1.什么是parquet文件? Parquet是列式存储格式的一种文件类型. Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置 2.通用的Load/Save函数 *读取Parquet文件
val usersDF = spark.read.load("/root/resources/users.parquet")
*查询用户的name和喜爱颜色,并保存
usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")
*显式指定文件格式:加载json格式
val usersDF = spark.read.format("json").load("/root/resources/people.json")spark.read.json()
*保存的时候,覆盖原来的文件
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
*将结果保存为表
usersDF.select($"name").write.saveAsTable("table1")
3.Parquet文件 *Spark SQL提供支持对于Parquet文件的读写,也就是自动保存原始数据的schema
读取json文件
val empJson = spark.read.json("/root/data/emp.json")
将数据保存为parquet格式
empJson.write.parquet("/root/data/parquet")
重新读取Parquet文件
val empParquet = spark.read.parquet("/root/data/parquet")
创建临时视图
empParquet.createTempView("emp")
查询
spark.sql("select * from emp").show
*scheme的合并 Parquet支持Schema evolution(Schema演变,即:合并)。用户可以先定义一个简单的Schema, 然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。
val def1 = sc.makeRDD(1 to 5).map(i => (i,2*i)).toDF("single","double")
df1.write.parquet("/root/test/key=1")
val def2 = sc.makeRDD(6 to 10).map(i => (i,3*i)).toDF("single","triple")
df2.write.parquet("/root/test/key=2")
val df3 = spark.read.option("meugeScheme","true").parquet("/root/test/")
df3.printScheme()
4.JDBC
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.util.Properties
object LoadAndSaveDemo {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val jdbcDL: DataFrame = spark.read.format("jdbc").options(Map(
"url" -> "jdbc:mysql://localhost:3306/test",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "user",
"user" -> "root",
"password" -> "123456"
)).load()
//jdbcDL.show()
jdbcDL.select("name").show()
//jdbcDL.write.format("json").save("d:/data/save/out1")
//jdbcDL.write.save("")
//存储到jdbc
val prop = new Properties()
prop.put("user","root")
prop.put("password","123456")
//jdbcDL.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test","user2",prop)
spark.stop()
}
}
5.Hive Table HIve 2.x 推荐使用 二、代码实现 1.SparkSql 1.X操作实例 1 使用case class 和 sql查询
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object SQLDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local").setAppName("SQLDemo1")
val sc = new SparkContext(conf)
//SQLContext是对SparkContext的一个包装 (增强了功能,可以处理结构化数据)
val sqlC = new SQLContext(sc)
//Dataframe= RDD + Schema
val lines = sc.parallelize(List("1,tom,99,29","2,marry,98,30","3,jim,98,27"))
val studentRDD = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val fv = fields(2).toDouble
val age = fields(3).toInt
Student1(id,name,fv,age)
})
//将RDD转换成DataFrame
//导入隐式转换
import sqlC.implicits._
val df = studentRDD.toDF
//对DataFrame 进行操作
//使用sql风格的API
df.registerTempTable("student")
//sql是一个transformartion
val result = sqlC.sql("select name,fv from student order by fv desc,age desc")
//触发action
result.show()
sc.stop()
}
}
/**
* case class 将数据保存到case class
* case class 的特点:不用new ;实现序列化;模式匹配
* @param id
* @param name
* @param fv
* @param age
*/
case class Student1(id:Long,name:String,fv:Double,age:Int)
2.SparkSql 1.X操作实例 2 使用Row 和 DSL语法
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
object SQLDemo_2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local").setAppName("SQLDemo1")
val sc = new SparkContext(conf)
//SQLContext是对SparkContext的一个包装 (增强了功能,可以处理结构化数据)
val sqlC = new SQLContext(sc)
//Dataframe= RDD + Schema
val lines = sc.parallelize(List("1,tom,99,29","2,marry,98,30","3,jim,98,27"))
val studentRDD = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val fv = fields(2).toDouble
val age = fields(3).toInt
Row(id,name,fv,age)
})
//不是用对象 创建一个scheme
//创建一个scheme(元数据)
val scheme = StructType(
List(
StructField("id",LongType),
StructField("name",StringType),
StructField("fv",DoubleType),
StructField("age",IntegerType)
)
)
//RDD关联scheme
val df = sqlC.createDataFrame(studentRDD,scheme)
//使用DSL语法 调用DataFrame
//select是一个tarnsformation
val selected = df.select("name","fv","age")
//排序
//导入隐式转换
import sqlC.implicits._
val result = selected.orderBy($"fv" desc ,$"age" asc)
result.show()
sc.stop()
}
}
3. SparkSql2.x 操作实例 使用SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
object SparkSql2_1 {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
//val session = SparkSession.builder().appName("SparkSql2.1").master("local")
val session = SparkSession.builder()
.appName("SQLDemo2x1")
.master("local[*]").getOrCreate()
val lines = session.sparkContext.parallelize(List("1,tom,99,29", "2,marry,98,30", "3,jim,98,27"))
val RowRDD = lines.map(line =>{
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val fcValue = fields(2).toDouble
val age = fields(3).toInt
Row(id, name, fcValue, age)
})
val scheme = StructType(List(
StructField("id", LongType),
StructField("name", StringType),
StructField("fv", DoubleType),
StructField("age", IntegerType)
))
val df = session.createDataFrame(RowRDD,scheme)
df.createTempView("student")
val result = session.sql("SELECT name,fv,age FROM student WHERE age>27 ORDER BY fv DESC,age ASC")
//Action 实现输出
result.show()
}
}
4.Sql 实现WordCount 使用SparkSession.read.textFile读取文件
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, SparkSession}
object Sql_wordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val session = SparkSession.builder().appName("wordCount").master("local[3]").getOrCreate()
//隐式转换
import session.implicits._
val words: Dataset[String] = session.read.textFile("d:\\data\\word.txt")
val word = words.flatMap(x =>x.split(" "))
//临时表
word.createTempView("word")
//sql计数
//session.sql("select value words,count(*) counts from word group by words order by counts desc").show()
//分组计数
val grouped = word.groupBy("value")
grouped.count().show()
session.stop()
}
}