前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[新星计划]Spark综合练习——电影评分数据分析

[新星计划]Spark综合练习——电影评分数据分析

作者头像
Maynor
发布2021-06-09 23:58:42
6560
发布2021-06-09 23:58:42
举报
文章被收录于专栏:最新最全的大数据技术体系

文章目录

引言

大家好,我是ChinaManor,直译过来就是中国码农的意思,俺希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,一个平凡而不平庸的人。

今天给大家带来一个Spark综合练习案例–电影评分

老师:给定需求统计评分次数>200的电影平均分Top10,并写入Mysql数据库中

我:所有字我都认识,怎么连在一起我就不认识了

在这里插入图片描述
在这里插入图片描述

不管了先new个实例对象,总没错吧

代码语言:javascript
复制
val sparkSession = SparkSession
      .builder()
      .config("spark.sql.shuffle.partitions", "4")
      .appName("电影数据分析")
      .master("local[2]")
      .getOrCreate()

然后大数据无非输入,转换,输出,我再弄个spark读取文件?

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
    val lines: RDD[String] = sparkSession.read.textFile("E:\\xx\\SparkDemo\\input\\ratings.dat").rdd

再然后RDD转换成DF

代码语言:javascript
复制
val rdd: RDD[(Int, Int, Int, Long)] = lines.mapPartitions { item => {
      item.map { line => {
        val strings: Array[String] = line.trim.split("::")
        (strings(0).toInt, strings(1).toInt, strings(2).toInt, strings(3).toLong)
        }
      }
    }

    }
    import sparkSession.implicits._
    val reusltDF: DataFrame = rdd.toDF("user_id", "item_id", "rating", "timestamp")

测试一下行不行

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
//    查看约束
    reusltDF.printSchema()
    //查看数据
    reusltDF.show()

好像跑通了!!笑容逐渐放肆~什么SQL不整了,上来直接DSL

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
val resultDS: Dataset[Row] = reusltDF
          //a.对数据按电影id进行分组
          .groupBy($"item_id")
          //b.对聚合数据求平均值和评分次数
          .agg(
            round(avg($"rating"), 2).as("avg_rating"),
            count($"user_id").as("cnt_rating")
          )
          //c.过滤出评分大于2000的
          .filter($"cnt_rating" > 2000)
          //d.按照评分的平均值进行降序排序
          .orderBy($"avg_rating".desc)
          //e.取前十条数据
          .limit(10)

最后最后保存到Mysql SaveToMysql(resultDF);

代码语言:javascript
复制
/**
    * 保存数据至MySQL数据库,使用函数foreachPartition对每个分区数据操作,主键存在时更新,不存在时插入
    */
    def saveToMySQL(dataFrame: DataFrame): Unit = {
      dataFrame.rdd.coalesce(1).foreachPartition{ iter =>
      // a. 加载驱动类
        Class.forName("com.mysql.cj.jdbc.Driver")
      // 声明变量
      var conn: Connection = null
      var pstmt: PreparedStatement = null
      try{
        // b. 获取连接
        conn = DriverManager.getConnection(
          "jdbc:mysql://192.168.88.100:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
          "root", //
          "123456"
        )
        // c. 获取PreparedStatement对象
        val insertSql ="""
                |INSERT
                |O
                | db_test.demo
                | (item_id, avg_rating, cnt_rating)
                |VALUES (?, ?, ?)
                |""".stripMargin
        pstmt = conn.prepareStatement(insertSql)
        conn.setAutoCommit(false)
        // d. 将分区中数据插入到表中,批量插入
        iter.foreach{ row =>
          pstmt.setInt(1, row.getAs[Int]("item_id"))
          pstmt.setInt(2, row.getAs[Int]("avg_rating"))
          pstmt.setInt(3, row.getAs[Int]("cnt_rating"))
          // 加入批次
          pstmt.addBatch()
        }
        // TODO: 批量插入
        pstmt.executeBatch()
        conn.commit()
      }catch {
        case e: Exception => e.printStackTrace()
      }finally {
        if(null != pstmt) pstmt.close()
        if(null != conn) conn.close()
      } }
在这里插入图片描述
在这里插入图片描述

大功告成了!

在这里插入图片描述
在这里插入图片描述

总结

以上便是电影评分数据分析spark版,愿你读过之后有自己的收获,如果有收获不妨一键三连一下~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/05/31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 引言
  • 今天给大家带来一个Spark综合练习案例–电影评分
  • 总结
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档