首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析


案例三:电影评分数据分析

     使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:

对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于200)。

数据格式如下,每行数据各个字段之间使用双冒号分开:

数据处理分析步骤如下:

  1. 第一步、读取电影评分数据,从本地文件系统读取
  2.  第二步、转换数据,指定Schema信息,封装到DataFrame
  3.  第三步、基于SQL方式分析
  4.  第四步、基于DSL方式分析

代码实现

     电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:

代码语言:javascript
复制
package cn.itcast.sql

import java.util.Properties

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
 * 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
 */
object SparkTop10Movie {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[*]")
          // TODO: 设置shuffle时分区数目
          .config("spark.sql.shuffle.partitions", "4")
          .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
        
        // 1. 读取电影评分数据,从本地文件系统读取
        val rawRatingsDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")
        
        // 2. 转换数据
        val ratingsDF: DataFrame = rawRatingsDS
            // 过滤数据
            .filter(line => null != line && line.trim.split("\t").length == 4)
            // 提取转换数据
            .mapPartitions{iter =>
                iter.map{line =>
                    // 按照分割符分割,拆箱到变量中
                    val Array(userId, movieId, rating, timestamp) = line.trim.split("\t")
                    // 返回四元组
                    (userId, movieId, rating.toDouble, timestamp.toLong)
                }
            }
            // 指定列名添加Schema
            .toDF("userId", "movieId", "rating", "timestamp")
        /*
            root
             |-- userId: string (nullable = true)
             |-- movieId: string (nullable = true)
             |-- rating: double (nullable = false)
             |-- timestamp: long (nullable = false)
        */
        ratingsDF.printSchema()
        /*
            +------+-------+------+---------+
            |userId|movieId|rating|timestamp|
            +------+-------+------+---------+
            |     1|   1193|   5.0|978300760|
            |     1|    661|   3.0|978302109|
            |     1|    594|   4.0|978302268|
            |     1|    919|   4.0|978301368|
            +------+-------+------+---------+
         */
        ratingsDF.show(4)
        
        // TODO: 基于SQL方式分析
        // 第一步、注册DataFrame为临时视图
        ratingsDF.createOrReplaceTempView("view_temp_ratings")
        
        // 第二步、编写SQL
        val top10MovieDF: DataFrame = spark.sql(
            """
              |SELECT
              |  movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
              |FROM
              |  view_temp_ratings
              |GROUP BY
              |  movieId
              |HAVING
              |  cnt_rating > 200
              |ORDER BY
              |  avg_rating DESC, cnt_rating DESC
              |LIMIT
              |  10
            """.stripMargin)
        //top10MovieDF.printSchema()
        top10MovieDF.show(10, truncate = false)
        
        println("===============================================================")
        
        // TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
        import org.apache.spark.sql.functions._
        val resultDF: DataFrame = ratingsDF
            // 选取字段
            .select($"movieId", $"rating")
            // 分组:按照电影ID,获取平均评分和评分次数
            .groupBy($"movieId")
            .agg(
                round(avg($"rating"), 2).as("avg_rating"),
                count($"movieId").as("cnt_rating")
            )
            // 过滤:评分次数大于200
            .filter($"cnt_rating" > 200)
            // 排序:先按照评分降序,再按照次数降序
            .orderBy($"avg_rating".desc, $"cnt_rating".desc)
            // 获取前10
            .limit(10)
        //resultDF.printSchema()
        resultDF.show(10)

        /*// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
        // 结果DataFrame被使用多次,缓存
        resultDF.persist(StorageLevel.MEMORY_AND_DISK)
        
        // 1. 保存MySQL数据库表汇总
        resultDF
            .coalesce(1)
            .write
            .mode("overwrite")
            .option("driver", "com.mysql.jdbc.Driver")
            .option("user", "root")
            .option("password", "root")
            .jdbc(
                "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8",
                "top10_movies",
                new Properties()
            )
        
        // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
        resultDF
            .coalesce(1)
            .write.mode("overwrite")
            .csv("data/output/top10-movies")
        
        // 释放缓存数据
        resultDF.unpersist()*/

        spark.stop()
    }
}

​​​​​​​Shuffle分区数

运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。

原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。可以在构建SparkSession实例对象时进行设置

代码语言:javascript
复制
val spark = SparkSession.builder()
  .appName(this.getClass.getSimpleName.stripSuffix("$"))
  .master("local[*]")
  // TODO: 设置shuffle时分区数目
  .config("spark.sql.shuffle.partitions", "4")
  .getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
下一篇
举报
领券