前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkSql学习笔记一

SparkSql学习笔记一

作者头像
曼路
发布2018-10-18 15:13:26
8040
发布2018-10-18 15:13:26
举报
文章被收录于专栏:浪淘沙浪淘沙

一、SparkSql介绍

1.简介     Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。     为什么要学习Spark SQL?     我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。 2.特点     *容易整合     *统一的数据访问方式     *兼容Hive     *标准的数据连接 3.基本概念     *DataFrame         DataFrame(表) = schema(表结构) + Data(表结构,RDD)             就是一个表 是SparkSql 对结构化数据的抽象             DataFrame表现形式就是RDD         DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,         DataFrame多了数据的结构信息,即schema。         RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。         DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化     *Datasets         Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。 4.创建表 DataFrame     方式一 使用case class 定义表         val df = studentRDD.toDF     方式二 使用SparkSession直接生成表         val df = session.createDataFrame(RowRDD,scheme)     方式三 直接读取一个带格式的文件(json文件)         spark.read.json("") 5.视图(虚表)     普通视图         df.createOrReplaceTempView("emp")             只对当前对话有作用     全局视图         df.createGlobalTempView("empG")             在全局(不同会话)有效             前缀:global_temp 6.操作表:     两种语言:SQL,DSL      spark.sql("select * from t ").show     df.select("name").show 

二、load和save     1.什么是parquet文件?         Parquet是列式存储格式的一种文件类型.         Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置     2.通用的Load/Save函数         *读取Parquet文件

代码语言:javascript
复制
  val usersDF = spark.read.load("/root/resources/users.parquet")

        *查询用户的name和喜爱颜色,并保存

代码语言:javascript
复制
usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")

        *显式指定文件格式:加载json格式             

代码语言:javascript
复制
val usersDF = spark.read.format("json").load("/root/resources/people.json")spark.read.json()

        *保存的时候,覆盖原来的文件

代码语言:javascript
复制
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

        *将结果保存为表           

代码语言:javascript
复制
 usersDF.select($"name").write.saveAsTable("table1")

    3.Parquet文件         *Spark SQL提供支持对于Parquet文件的读写,也就是自动保存原始数据的schema

代码语言:javascript
复制
                读取json文件
                val empJson = spark.read.json("/root/data/emp.json")
                将数据保存为parquet格式
                empJson.write.parquet("/root/data/parquet")
                重新读取Parquet文件
                val empParquet = spark.read.parquet("/root/data/parquet")
                创建临时视图
                empParquet.createTempView("emp")
                查询
                spark.sql("select * from emp").show

        *scheme的合并             Parquet支持Schema evolution(Schema演变,即:合并)。用户可以先定义一个简单的Schema,             然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。

代码语言:javascript
复制
            val def1 = sc.makeRDD(1 to 5).map(i => (i,2*i)).toDF("single","double")
            df1.write.parquet("/root/test/key=1")
            val def2 = sc.makeRDD(6 to 10).map(i => (i,3*i)).toDF("single","triple")
            df2.write.parquet("/root/test/key=2")
            val df3 = spark.read.option("meugeScheme","true").parquet("/root/test/")
                df3.printScheme()

    4.JDBC

代码语言:javascript
复制
import org.apache.log4j.{Level, Logger}
        import org.apache.spark.sql.{DataFrame, SparkSession}
        import java.util.Properties
        object LoadAndSaveDemo {
          Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
          def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder().master("local[*]").getOrCreate()
            import spark.implicits._
            val jdbcDL: DataFrame = spark.read.format("jdbc").options(Map(
              "url" -> "jdbc:mysql://localhost:3306/test",
              "driver" -> "com.mysql.jdbc.Driver",
              "dbtable" -> "user",
              "user" -> "root",
              "password" -> "123456"
            )).load()
            //jdbcDL.show()

            jdbcDL.select("name").show()
            //jdbcDL.write.format("json").save("d:/data/save/out1")
            //jdbcDL.write.save("")
            //存储到jdbc
            val prop = new Properties()
            prop.put("user","root")
            prop.put("password","123456")
            //jdbcDL.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test","user2",prop)

            spark.stop()
          }

        }

    5.Hive Table         HIve 2.x 推荐使用 二、代码实现     1.SparkSql 1.X操作实例 1         使用case class  和 sql查询

代码语言:javascript
复制
        import org.apache.log4j.{Level, Logger}
        import org.apache.spark.sql.SQLContext
        import org.apache.spark.{SparkConf, SparkContext}

        object SQLDemo {

          def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
            val conf = new SparkConf().setMaster("local").setAppName("SQLDemo1")
            val sc = new SparkContext(conf)
            //SQLContext是对SparkContext的一个包装 (增强了功能,可以处理结构化数据)
            val sqlC = new SQLContext(sc)

            
            //Dataframe= RDD + Schema
            val lines = sc.parallelize(List("1,tom,99,29","2,marry,98,30","3,jim,98,27"))
            val studentRDD = lines.map(line => {
              val fields = line.split(",")
              val id = fields(0).toLong
              val name = fields(1)
              val fv = fields(2).toDouble
              val age = fields(3).toInt
              Student1(id,name,fv,age)
            })

            //将RDD转换成DataFrame
            //导入隐式转换
            import sqlC.implicits._
            val df = studentRDD.toDF
            //对DataFrame 进行操作
            //使用sql风格的API
            df.registerTempTable("student")
            //sql是一个transformartion
            val result = sqlC.sql("select name,fv from student order by fv desc,age desc")
            //触发action
            result.show()
            sc.stop()


          }
        }

        /**
          * case class 将数据保存到case class
          * case class 的特点:不用new ;实现序列化;模式匹配
          * @param id
          * @param name
          * @param fv
          * @param age
          */
        case class Student1(id:Long,name:String,fv:Double,age:Int)

    2.SparkSql 1.X操作实例 2         使用Row 和 DSL语法

代码语言:javascript
复制
        import org.apache.log4j.{Level, Logger}
        import org.apache.spark.{SparkConf, SparkContext}
        import org.apache.spark.sql.SQLContext
        import org.apache.spark.sql.Row
        import org.apache.spark.sql.types._
        object SQLDemo_2 {
          def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
            val conf = new SparkConf().setMaster("local").setAppName("SQLDemo1")
            val sc = new SparkContext(conf)
            //SQLContext是对SparkContext的一个包装 (增强了功能,可以处理结构化数据)
            val sqlC = new SQLContext(sc)

            //Dataframe= RDD + Schema
            val lines = sc.parallelize(List("1,tom,99,29","2,marry,98,30","3,jim,98,27"))
            val studentRDD = lines.map(line => {
              val fields = line.split(",")
              val id = fields(0).toLong
              val name = fields(1)
              val fv = fields(2).toDouble
              val age = fields(3).toInt
              Row(id,name,fv,age)
            })

            //不是用对象  创建一个scheme
            //创建一个scheme(元数据)
            val scheme = StructType(
              List(
                StructField("id",LongType),
                StructField("name",StringType),
                StructField("fv",DoubleType),
                StructField("age",IntegerType)

              )
            )
            //RDD关联scheme
            val df = sqlC.createDataFrame(studentRDD,scheme)
            //使用DSL语法 调用DataFrame
            //select是一个tarnsformation
            val selected = df.select("name","fv","age")
            //排序
            //导入隐式转换
            import sqlC.implicits._
            val result = selected.orderBy($"fv" desc ,$"age" asc)
            result.show()
            sc.stop()
          }
        }

    3. SparkSql2.x 操作实例         使用SparkSession

代码语言:javascript
复制
        import org.apache.log4j.{Level, Logger}
        import org.apache.spark.rdd.RDD
        import org.apache.spark.sql.types._
        import org.apache.spark.{SparkConf, SparkContext}
        import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}

        object SparkSql2_1 {
          Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
          def main(args: Array[String]): Unit = {
            //val session = SparkSession.builder().appName("SparkSql2.1").master("local")
            val session = SparkSession.builder()
              .appName("SQLDemo2x1")
              .master("local[*]").getOrCreate()
            val lines = session.sparkContext.parallelize(List("1,tom,99,29", "2,marry,98,30", "3,jim,98,27"))
            val RowRDD = lines.map(line =>{
              val fields = line.split(",")
              val id = fields(0).toLong
              val name = fields(1)
              val fcValue = fields(2).toDouble
              val age = fields(3).toInt

              Row(id, name, fcValue, age)
            })
            val scheme = StructType(List(
              StructField("id", LongType),
              StructField("name", StringType),
              StructField("fv", DoubleType),
              StructField("age", IntegerType)
            ))

            val df = session.createDataFrame(RowRDD,scheme)
            df.createTempView("student")
            val result = session.sql("SELECT name,fv,age FROM student WHERE age>27 ORDER BY fv DESC,age ASC")
            //Action 实现输出
            result.show()

          }

        }

    4.Sql 实现WordCount         使用SparkSession.read.textFile读取文件

代码语言:javascript
复制
        import org.apache.log4j.{Level, Logger}
        import org.apache.spark.sql.{Dataset, SparkSession}

        object Sql_wordCount {
          def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
            val session = SparkSession.builder().appName("wordCount").master("local[3]").getOrCreate()
            //隐式转换
            import session.implicits._
            val words: Dataset[String] = session.read.textFile("d:\\data\\word.txt")
            val word = words.flatMap(x =>x.split(" "))

            //临时表
            word.createTempView("word")
            //sql计数
            //session.sql("select value words,count(*) counts from word group by words order by counts desc").show()
            //分组计数
            val grouped = word.groupBy("value")
            grouped.count().show()

            session.stop()
          }

        }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年09月30日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档