首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount


案例一:花式查询

代码语言:javascript
复制
package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * Author itcast
 * Desc 演示SparkSQL的各种花式查询
 */
object FlowerQueryDemo {
  case class Person(id:Int,name:String,age:Int)

  def main(args: Array[String]): Unit = {
    //1.准备环境-SparkSession
    val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    //2.加载数据
    val lines: RDD[String] = sc.textFile("data/input/person.txt")

    //3.切割
    //val value: RDD[String] = lines.flatMap(_.split(" "))//错误的
    val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))

    //4.将每一行(每一个Array)转为样例类(相当于添加了Schema)
    val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))

    //5.将RDD转为DataFrame(DF)
    //注意:RDD的API中没有toDF方法,需要导入隐式转换!
    import spark.implicits._
    val personDF: DataFrame = personRDD.toDF

    //6.查看约束
    personDF.printSchema()

    //7.查看分布式表中的数据集
    personDF.show(6,false)//false表示不截断列名,也就是列名很长的时候不会用...代替


    //演示SQL风格查询
    //0.注册表名
    //personDF.registerTempTable("t_person")//已经过时
    //personDF.createTempView("t_person")//创建表,如果已存在则报错:TempTableAlreadyExistsException
    //personDF.createOrReplaceGlobalTempView("t_person")//创建全局表,可以夸session使用,查询的时候使用:SELECT * FROM global_temp.表名;生命周期太大,一般不用
    personDF.createOrReplaceTempView("t_person")//创建一个临时表,只有当前session可用!且表如果存在会替换!

    //1.查看name字段的数据
    spark.sql("select name from t_person").show
    //2.查看 name 和age字段数据
    spark.sql("select name,age from t_person").show
    //3.查询所有的name和age,并将age+1
    spark.sql("select name,age,age+1 from t_person").show
    //4.过滤age大于等于25的
    spark.sql("select name,age from t_person where age >=25").show
    //5.统计年龄大于30的人数
    spark.sql("select count(age) from t_person where age >30").show
    //6.按年龄进行分组并统计相同年龄的人数
    spark.sql("select age,count(age) from t_person group by age").show


    //演示DSL风格查询
    //1.查看name字段的数据
    import org.apache.spark.sql.functions._
    personDF.select(personDF.col("name")).show
    personDF.select(personDF("name")).show
    personDF.select(col("name")).show
    personDF.select("name").show

    //2.查看 name 和age字段数据
    personDF.select(personDF.col("name"),personDF.col("age")).show
    personDF.select("name","age").show

    //3.查询所有的name和age,并将age+1
    //personDF.select("name","age","age+1").show//错误,没有age+1这一列
    //personDF.select("name","age","age"+1).show//错误,没有age1这一列
    personDF.select(col("name"),col("age"),col("age")+1).show
    personDF.select($"name",$"age",$"age"+1).show
    //$表示将"age"变为了列对象,先查询再和+1进行计算
    personDF.select('name,'age,'age+1).show
    //'表示将age变为了列对象,先查询再和+1进行计算


    //4.过滤age大于等于25的,使用filter方法/where方法过滤
    personDF.select("name","age").filter("age>=25").show
    personDF.select("name","age").where("age>=25").show

    //5.统计年龄大于30的人数
    personDF.where("age>30").count()

    //6.按年龄进行分组并统计相同年龄的人数
    personDF.groupBy("age").count().show

  }

}

​​​​​​​案例二:WordCount

前面使用RDD封装数据,实现词频统计WordCount功能,从Spark 1.0开始,一直到Spark 2.0,建立在RDD之上的一种新的数据结构DataFrame/Dataset发展而来,更好的实现数据处理分析。DataFrame 数据结构相当于给RDD加上约束Schema,知道数据内部结构(字段名称、字段类型),提供两种方式分析处理数据:DataFrame API(DSL编程)和SQL(类似HiveQL编程),下面以WordCount程序为例编程实现,体验DataFrame使用。

基于DSL编程

使用SparkSession加载文本数据,封装到Dataset/DataFrame中,调用API函数处理分析数据(类似RDD中API函数,如flatMap、map、filter等),编程步骤:

 第一步、构建SparkSession实例对象,设置应用名称和运行本地模式;

 第二步、读取HDFS上文本文件数据;

 第三步、使用DSL(Dataset API),类似RDD API处理分析数据;

 第四步、控制台打印结果数据和关闭SparkSession;

基于SQL编程

也可以实现类似HiveQL方式进行词频统计,直接对单词分组group by,再进行count即可,步骤如下:

 第一步、构建SparkSession对象,加载文件数据,分割每行数据为单词;

 第二步、将DataFrame/Dataset注册为临时视图(Spark 1.x中为临时表);

 第三步、编写SQL语句,使用SparkSession执行获取结果;

 第四步、控制台打印结果数据和关闭SparkSession;

具体演示代码如下:

代码语言:javascript
复制
package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * Author itcast
 * Desc 使用SparkSQL完成WordCount---SQL风格和DSL风格
 */
object WordCount {
  def main(args: Array[String]): Unit = {
    //1.准备环境
    val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //2.加载数据
    //val rdd: RDD[String] = sc.textFile("data/input/words.txt")//可以使用该方式,然后使用昨天的知识将rdd转为df/ds
    val df: DataFrame = spark.read.text("data/input/words.txt")
    val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")
    //df.show()//查看分布式表数据
    //ds.show()//查看分布式表数据
    

    //3.做WordCount
    //切割
    //df.flatMap(_.split(" ")) //注意:直接这样写报错!因为df没有泛型,不知道_是String!
    //df.flatMap(row=>row.getAs[String]("value").split(" "))
    val wordsDS: Dataset[String] = ds.flatMap(_.split(" "))
    //wordsDS.show()
    
    //使用SQL风格做WordCount
    wordsDS.createOrReplaceTempView("t_words")
    val sql:String =
      """
        |select value,count(*) as count
        |from t_words
        |group by value
        |order by count desc
        |""".stripMargin
    spark.sql(sql).show()

    //使用DSL风格做WordCount
    wordsDS
      .groupBy("value")
      .count()
      .orderBy($"count".desc)
      .show()
    
    /*
    +-----+-----+
    |value|count|
    +-----+-----+
    |hello|    4|
    |  her|    3|
    |  you|    2|
    |   me|    1|
    +-----+-----+
    
    +-----+-----+
    |value|count|
    +-----+-----+
    |hello|    4|
    |  her|    3|
    |  you|    2|
    |   me|    1|
    +-----+-----+
     */

  }
}

无论使用DSL还是SQL编程方式,底层转换为RDD操作都是一样,性能一致,查看WEB UI监控中Job运行对应的DAG图如下:

从上述的案例可以发现将数据封装到Dataset/DataFrame中,进行处理分析,更加方便简洁,这就是Spark框架中针对结构化数据处理模:Spark SQL模块

官方文档:http://spark.apache.org/sql/

下一篇
举报
领券