前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark综合练习——电影评分数据分析

Spark综合练习——电影评分数据分析

作者头像
Maynor
发布2022-05-08 13:32:45
1.4K0
发布2022-05-08 13:32:45
举报

文章目录
  • 引言
  • 今天给大家带来一个Spark综合练习案例--电影评分
  • 补充: 采用DSL编程的详尽注释版
  • 总结

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,俺希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,一个平凡而不平庸的人。

全部数据: 链接:https://pan.baidu.com/s/1qiO9aRb7yQeuHDtH1cWklw 提取码:nwxj

今天给大家带来一个Spark综合练习案例–电影评分

老师:给定需求统计评分次数>200的电影平均分Top10,并写入Mysql数据库中

我:所有字我都认识,怎么连在一起我就不认识了

不管了先new个实例对象,总没错吧

代码语言:javascript
复制
val sparkSession = SparkSession
      .builder()
      .config("spark.sql.shuffle.partitions", "4")
      .appName("电影数据分析")
      .master("local[2]")
      .getOrCreate()

然后大数据无非输入,转换,输出,我再弄个spark读取文件?

代码语言:javascript
复制
    val lines: RDD[String] = sparkSession.read.textFile("E:\\xx\\SparkDemo\\input\\ratings.dat").rdd

再然后RDD转换成DF

代码语言:javascript
复制
val rdd: RDD[(Int, Int, Int, Long)] = lines.mapPartitions { item => {
      item.map { line => {
        val strings: Array[String] = line.trim.split("::")
        (strings(0).toInt, strings(1).toInt, strings(2).toInt, strings(3).toLong)
        }
      }
    }

    }
    import sparkSession.implicits._
    val reusltDF: DataFrame = rdd.toDF("user_id", "item_id", "rating", "timestamp")

测试一下行不行

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
//    查看约束
    reusltDF.printSchema()
    //查看数据
    reusltDF.show()

好像跑通了!!笑容逐渐放肆~什么SQL不整了,上来直接DSL

代码语言:javascript
复制
val resultDS: Dataset[Row] = reusltDF
          //a.对数据按电影id进行分组
          .groupBy($"item_id")
          //b.对聚合数据求平均值和评分次数
          .agg(
            round(avg($"rating"), 2).as("avg_rating"),
            count($"user_id").as("cnt_rating")
          )
          //c.过滤出评分大于2000的
          .filter($"cnt_rating" > 2000)
          //d.按照评分的平均值进行降序排序
          .orderBy($"avg_rating".desc)
          //e.取前十条数据
          .limit(10)

最后最后保存到Mysql SaveToMysql(resultDF);

代码语言:javascript
复制
/**
    * 保存数据至MySQL数据库,使用函数foreachPartition对每个分区数据操作,主键存在时更新,不存在时插入
    */
    def saveToMySQL(dataFrame: DataFrame): Unit = {
      dataFrame.rdd.coalesce(1).foreachPartition{ iter =>
      // a. 加载驱动类
        Class.forName("com.mysql.cj.jdbc.Driver")
      // 声明变量
      var conn: Connection = null
      var pstmt: PreparedStatement = null
      try{
        // b. 获取连接
        conn = DriverManager.getConnection(
          "jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
          "root", //
          "123456"
        )
        // c. 获取PreparedStatement对象
        val insertSql ="""
                |INSERT
                |O
                | db_test.demo
                | (item_id, avg_rating, cnt_rating)
                |VALUES (?, ?, ?)
                |""".stripMargin
        pstmt = conn.prepareStatement(insertSql)
        conn.setAutoCommit(false)
        // d. 将分区中数据插入到表中,批量插入
        iter.foreach{ row =>
          pstmt.setInt(1, row.getAs[Int]("item_id"))
          pstmt.setInt(2, row.getAs[Int]("avg_rating"))
          pstmt.setInt(3, row.getAs[Int]("cnt_rating"))
          // 加入批次
          pstmt.addBatch()
        }
        // TODO: 批量插入
        pstmt.executeBatch()
        conn.commit()
      }catch {
        case e: Exception => e.printStackTrace()
      }finally {
        if(null != pstmt) pstmt.close()
        if(null != conn) conn.close()
      } }
在这里插入图片描述
在这里插入图片描述

大功告成了!

补充: 采用DSL编程的详尽注释版

代码语言:javascript
复制
package cn.itcast.spark.metrics

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

/**
 * 电影评分数据分析,需求如下:
 *      需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
 *          电影ID    评分个数     电影名称 平均评分   更新时间
 *          movie_id、rating_num、title、rating_avg、update_time
 *      需求2:查找每个电影类别及其对应的平均评分
 *          电影类别  电影类别平均评分     更新时间
 *          genre、 rating_avg       、update_time
 *      需求3:查找被评分次数较多的前十部电影
 *          电影ID   电影名称  电影被评分的次数   更新时间
 *          movie_id、title、rating_num、      update_time
*/
object MetricsAppMain {
	
	// 文件路径
	private val RATINGS_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\ratings.csv"
	private val MOVIES_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\movies.csv"
	
	def main(args: Array[String]): Unit = {
		// step1、创建SparkSession实例对象
		val spark: SparkSession = createSparkSession(this.getClass)
		import spark.implicits._
	
		/*
			分析需求可知,三个需求最终结果,需要使用事实表数据和维度表数据关联,所以先数据拉宽,再指标计算
				TODO: 按照数据仓库分层理论管理数据和开发指标
				- 第一层(最底层):ODS层
					直接加CSV文件数据为DataFrame
				- 第二层(中间层):DW层
					将加载业务数据(电影评分数据)和维度数据(电影基本信息数据)进行Join关联,拉宽操作
				- 第三层(最上层):DA层/APP层
					依据需求开发程序,计算指标,进行存储到MySQL表
		 */
		// step2、【ODS层】:加载数据,CSV格式数据,文件首行为列名称
		val ratingDF: DataFrame = readCsvFile(spark, RATINGS_CSV_FILE_PATH, verbose = false)
		val movieDF: DataFrame = readCsvFile(spark, MOVIES_CSV_FILE_PATH, verbose = false)

		// step3、【DW层】:将电影评分数据与电影信息数据进行关联,数据拉宽操作
		val detailDF: DataFrame = joinDetail(ratingDF, movieDF)
		//printConsole(detailDF)
		
		// step4、【DA层】:按照业务需求,进行指标统计分析
		computeMetric(detailDF)
		
		Thread.sleep(1000000)
		// 应用结束,关闭资源
		spark.stop()
	}
	
	/**
	 * 构建SparkSession实例对象,默认情况下本地模式运行
	 */
	def createSparkSession(clazz: Class[_], master: String = "local[2]"): SparkSession = {
		SparkSession.builder()
			.appName(clazz.getSimpleName.stripSuffix("$"))
			.master(master)
			.config("spark.sql.shuffle.partitions", "2")
			.getOrCreate()
	}
	
	/**
	 * 读取CSV格式文本文件数据,封装到DataFrame数据集
	 */
	def readCsvFile(spark: SparkSession, path: String, verbose: Boolean = true): DataFrame = {
		val dataframe: DataFrame = spark.read
			// 设置分隔符为逗号
			.option("sep", ",")
			// 文件首行为列名称
			.option("header", "true")
			// 依据数值自动推断数据类型
			.option("inferSchema", "true")
			.csv(path)
		if(verbose){
			printConsole(dataframe)
		}
		// 返回数据集
		dataframe
	}
	
	/**
	 * 将事实表数据与维度表数据进行Join关联
	 */
	def joinDetail(df1: DataFrame, df2: DataFrame, joinType: String = "left_outer"): DataFrame = {
		df1
			// 采用leftJoin关联数据
			.join(df2, df1("movieId") === df2("movieId"), joinType)
			// 选取字段
    		.select(
			    df1("userId").as("user_id"), //
			    df1("movieId").as("movie_id"), //
			    df1("rating"), //
			    df1("timestamp"), //
			    df2("title"), //
			    df2("genres") //
		    )
	}
	
	/**
	 * 按照业务需求,进行指标统计,默认情况下,结果数据打印控制台
	 */
	def computeMetric(dataframe: DataFrame): Unit = {
		// TODO: 缓存数据
		dataframe.persist(StorageLevel.MEMORY_AND_DISK)
		
		// 需求1:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
		val top10FilesDF: DataFrame = top10Films(dataframe)
		//printConsole(top10FilesDF)
		upsertToMySQL(
			top10FilesDF, //
			"replace into db_metrics.top_10_files (id, movie_id, rating_num, title, rating_avg) values (null, ?, ?, ?, ?)", //
			(pstmt: PreparedStatement, row: Row) => {
				pstmt.setInt(1, row.getAs[Int]("movie_id"))
				pstmt.setLong(2, row.getAs[Long]("rating_num"))
				pstmt.setString(3, row.getAs[String]("title"))
				pstmt.setDouble(4, row.getAs[Double]("rating_avg"))
			}
		)
		
		// 需求2:查找每个电影类别及其对应的平均评分
		val genresRatingDF: DataFrame = genresRating(dataframe)
		//printConsole(genresRatingDF)
//		upsertToMySQL(
//			genresRatingDF, //
//			"replace into db_metrics.genres_rating (id, genre, rating_avg) values (null, ?, ?)", //
//			(pstmt: PreparedStatement, row: Row) => {
//				pstmt.setString(1, row.getAs[String]("genre"))
//				pstmt.setDouble(2, row.getAs[Double]("rating_avg"))
//			}
//		)
		
		// 需求3:查找被评分次数较多的前十部电影
		val best10FilesDF: DataFrame = best10Files(dataframe)
		//printConsole(best10FilesDF)
//		upsertToMySQL(
//			best10FilesDF, //
//			"replace into db_metrics.best_10_films (id, movie_id, title, rating_num) values (null, ?, ?, ?)", //
//			(pstmt: PreparedStatement, row: Row) => {
//				pstmt.setInt(1, row.getAs[Int]("movie_id"))
//				pstmt.setString(2, row.getAs[String]("title"))
//				pstmt.setLong(3, row.getAs[Long]("rating_num"))
//			}
//		)
		
		// 释放资源
		dataframe.unpersist()
	}
	
	/**
	 * 需求:查找电影评分个数超过50,且平均评分较高的前十部电影名称及其对应的平均评分
	 *    电影ID    评分个数     电影名称 平均评分   更新时间
	 *    movie_id、rating_num、title、rating_avg、update_time
	 */
	def top10Films(dataframe: DataFrame): DataFrame = {
		import dataframe.sparkSession.implicits._
		
		dataframe
			.groupBy($"movie_id", $"title")
			.agg(
				count($"movie_id").as("rating_num"), // 统计电影被评分的个数
				round(avg($"rating"), 2).as("rating_avg") // 统计电影被评分的平均分
			)
			// 过滤评分个数大于50
			.where($"rating_num" > 50)
			// 降序排序,按照平均分
			.orderBy($"rating_avg".desc)
			// 获取前10电影
			.limit(10)
			// 添加日期字段
			.withColumn("update_time", current_timestamp())
	}
	
	/**
	 * 需求:查找每个电影类别及其对应的平均评分
	 *     电影类别  电影类别平均评分     更新时间
	 *     genre、 rating_avg       、update_time
	 */
	def genresRating(dataframe: DataFrame): DataFrame = {
		import dataframe.sparkSession.implicits._
		
		dataframe
			// 将每个电影类别字段:genres,按照|划分,使用爆炸函数进行行转列
    		.select(
			    explode(split($"genres", "\\|")).as("genre"), //
			    $"rating" //
		    )
			// 按照类别分组,计算平均评分
    		.groupBy($"genre")
    		.agg(
		        round(avg($"rating"), 2).as("rating_avg")
		    )
			// 对统计值降序排序
    		.orderBy($"rating_avg".desc)
			// 添加日期字段
			.withColumn("update_time", current_timestamp())
	}
	
	/**
	 * 需求:查找被评分次数较多的前十部电影
	 *     电影ID   电影名称  电影被评分的次数   更新时间
	 *     movie_id、title、rating_num、      update_time
	 */
	def best10Files(dataframe: DataFrame): DataFrame = {
		import dataframe.sparkSession.implicits._
		
		dataframe
			.groupBy($"movie_id", $"title")
			.agg(
				count($"movie_id").as("rating_num") // 统计电影被评分的个数
			)
			// 降序排序,按照平均分
			.orderBy($"rating_num".desc)
			// 获取前10电影
			.limit(10)
			// 添加日期字段
			.withColumn("update_time", current_timestamp())
	}
	
	/**
	 * 将DataFrame数据集打印控制台,显示Schema信息和前10条数据
	 */
	def printConsole(dataframe: DataFrame): Unit = {
		// 显示Schema信息
		dataframe.printSchema()
		// 显示前10条数据
		dataframe.show(10, truncate = false)
	}
	
	/**
	 * 将数据保存至MySQL表中,采用replace方式,当主键存在时,更新数据;不存在时,插入数据
	 * @param dataframe 数据集
	 * @param sql 插入数据SQL语句
	 * @param accept 函数,如何设置Row中每列数据到SQL语句中占位符值
	 */
	def upsertToMySQL(dataframe: DataFrame, sql: String,
	                  accept: (PreparedStatement, Row) => Unit): Unit = {
		// 降低分区数目,对每个分区进行操作
		dataframe.coalesce(1).foreachPartition{iter =>
			// step1. 加载驱动类
			Class.forName("com.mysql.cj.jdbc.Driver")
			
			// 声明变量
			var conn: Connection = null
			var pstmt: PreparedStatement = null
			
			try{
				// step2. 创建连接
				conn = DriverManager.getConnection(
					"jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
					"root",
					"123456"
				)
				pstmt = conn.prepareStatement(sql)
				
				// step3. 插入数据
				iter.foreach{row =>
					// 设置SQL语句中占位符的值
					accept(pstmt, row)
					// 加入批次中
					pstmt.addBatch()
				}
				
				// 批量执行批次
				pstmt.executeBatch()
			}catch {
				case e: Exception => e.printStackTrace()
			}finally {
				// step4. 关闭连接
				if(null != pstmt) pstmt.close()
				if(null != conn) conn.close()
			}
		}
	}
	
}

总结

以上便是电影评分数据分析spark版,愿你读过之后有自己的收获,如果有收获不妨一键三连一下~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-05-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 引言
  • 今天给大家带来一个Spark综合练习案例–电影评分
  • 补充: 采用DSL编程的详尽注释版
  • 总结
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档