Spark2.x学习笔记:10、简易电影受众系统

10、 简易电影受众系统

本章内容,源码参考了https://github.com/XichengDong/simplespark

10.1 数据准备

(1)下载数据 https://grouplens.org/datasets/movielens/ https://grouplens.org/datasets/movielens/1m/ 单击ml-1m.zip链接即可下载

(2)上传到HDFS

[root@node1 data]# hdfs dfs -put ml-1m/ input
[root@node1 data]# hdfs dfs -ls input/ml-1m
Found 4 items
-rw-r--r--   3 root supergroup       5577 2017-09-24 04:08 input/ml-1m/README
-rw-r--r--   3 root supergroup     171308 2017-09-24 04:08 input/ml-1m/movies.dat
-rw-r--r--   3 root supergroup   24594131 2017-09-24 04:08 input/ml-1m/ratings.dat
-rw-r--r--   3 root supergroup     134368 2017-09-24 04:08 input/ml-1m/users.dat
[root@node1 data]#

10.2 数据格式

(1)users.dat

UserID::Gender::Age:Occupation:Zip-code

用户信息表结构 用户号::性别:年龄:职业:邮政编码

(2)movies.dat

MovieID::Title::Genres

电影信息表结构 电影号::标题::流派

(3)ratings.dat

UerID::MoviesID::Rating::Timestamp

评级表结构 UerID:: MoviesID::评级::时间戳

10.3 加载数据

[root@node1 ~]# spark-shell
17/09/24 04:37:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/24 04:37:55 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.80.131:4040
Spark context available as 'sc' (master = local[*], app id = local-1506242261932).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val usersRdd =sc.textFile("input/ml-1m/users.dat")
usersRdd: org.apache.spark.rdd.RDD[String] = input/ml-1m/users.dat MapPartitionsRDD[3] at textFile at <console>:24

scala> usersRdd.first
res7: String = 1::F::1::10::48067

scala> usersRdd.count
res2: Long = 6040

scala> val moviesRdd=sc.textFile("input/ml-1m/movies.dat")
moviesRdd: org.apache.spark.rdd.RDD[String] = input/ml-1m/movies.dat MapPartitionsRDD[18] at textFile at <console>:24

scala> moviesRdd.first
res7: String = 1::Toy Story (1995)::Animation|Children's|Comedy

scala> moviesRdd.count
res8: Long = 3883


scala> val ratingsRdd=sc.textFile("input/ml-1m/ratings.dat")
ratingsRdd: org.apache.spark.rdd.RDD[String] = input/ml-1m/ratings.dat MapPartitionsRDD[7] at textFile at <console>:24

scala> ratingsRdd.first
res6: String = 1::1193::5::978300760

scala> ratingsRdd.count
res4: Long = 1000209

scala>

10.4 问题1:看过《Lord of the Rings,The(1978)》用户年龄和性别分布

(1)首先,为了避免三表查询,这里我们需要提前确定电影《Lord of the Rings,The(1978)》的编号。 通过movies.dat文件查询可知,该电影编号是2116。 这样可以定义一个常量:

scala> val MOVIE_ID="2116"
MOVIE_ID: String = 2116

(2)对于用户表,我们只需要年龄和性别,用户ID用于关联。所以对于用户表,需要过滤出前三个字段即可,用户ID可以作为Key,年龄和性别可以作为Value。

scala> val users=usersRdd.map(_.split("::")).map{x => (x(0),(x(1),x(2)))}
users: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[9] at map at <console>:26

scala> users.take(10)
res5: Array[(String, (String, String))] = Array((1,(F,1)), (2,(M,56)), (3,(M,25)), (4,(M,45)), (5,(M,25)), (6,(F,50)), (7,(M,35)), (8,(M,25)), (9,(M,25)), (10,(F,35)))

(3)对于评级表ratings,需要过滤出用户ID和电影ID即可,然后再通过常量MOVIE_ID=”2116”进行过滤。

scala> val rating =ratingsRdd.map(_.split("::"))
rating: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:26

scala> rating.first
res9: Array[String] = Array(1, 1193, 5, 978300760)

scala> val userMovie=rating.map{x=>(x(0),x(1))}.filter(_._2.equals(MOVIE_ID))
userMovie: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[12] at filter at <console>:30

scala> userMovie.first
res0: (String, String) = (17,2116)

scala> 

备注:对于Key/Value型的RDD,_._1表示key,_._2表示value。

(4)将处理后的评级表和处理后的用户表进行join操作。注意,rdd1[key,value1] join rdd2[key,value2]的结果是[key,(value1,value2)],也就是key是关联字段,value是两个RDD组合形式。

scala> val userRating =userMovie.join(users)
userRating: org.apache.spark.rdd.RDD[(String, (String, (String, String)))] = MapPartitionsRDD[11] at join at <console>:36

scala> userRating.first
res3: (String, (String, (String, String))) = (749,(2116,(M,35)))

(5)对连接结果进行处理。

scala> val userDistribution=userRating.map{x=>(x._2._2,1)}.reduceByKey(_+_)
userDistribution: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[13] at reduceByKey at <console>:38

scala> userDistribution.foreach(println)
((F,50),3)
((F,45),3)
((M,56),8)
((M,50),22)
((F,35),13)
((F,18),9)
((M,18),72)
((M,25),169)
((M,1),13)
((M,35),66)
((F,25),28)
((F,56),2)
((F,1),4)
((M,45),26)

备注,,x._2._2表示Key/Value型RDD每项x的value的value(第2项的第2项)。 (6)Scala程序

package cn.hadron.spark.movie
import org.apache.spark._

/**
 * 看过“Lord of the Rings, The (1978)”用户和年龄性别分布
 */
object MovieUserAnalyzer {

  def main(args: Array[String]) {
    if (args.length < 1){
      println("Usage:MovieUserAnalyzer dataPath")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("MovieUserAnalyzer")
    val sc = new SparkContext(conf)
    //1.加载数据,创建RDD
    val MOVIE_ID = "2116"
    val usersRdd = sc.textFile(args(0) + "/users.dat")
    val ratingsRdd = sc.textFile(args(0) + "/ratings.dat")

    //2.解析用户表 RDD[(userID, (gender, age))]
    val users = usersRdd.map(_.split("::")).map { x =>
      (x(0), (x(1), x(2)))
    }

    //3.解析评级表 RDD[Array(userID, movieID, ratings, timestamp)]
    val rating = ratingsRdd.map(_.split("::"))
    //usermovie: RDD[(userID, movieID)]
    val usermovie = rating.map { x =>(x(0), x(1))}.filter(_._2.equals(MOVIE_ID))

    //4.join RDDs
    //useRating: RDD[(userID, (movieID, (gender, age))]
    val userRating = usermovie.join(users)
    //movieuser: RDD[(movieID, (movieTile, (gender, age))]
    val userDistribution = userRating.map { x =>(x._2._2, 1)}.reduceByKey(_ + _)
    //5.输出结果
    userDistribution.collect.foreach(println)

    sc.stop()
  }
}

10.5 问题2:年龄段在“18-24”的男人,最喜欢看10部电影

(1)程序代码

package cn.hadron.spark.movie
import org.apache.spark._
import scala.collection.immutable.HashSet
/**
  * .年龄段在“18-24”的男人,最喜欢看10部电影
  */
object PopularMovieAnalyzer {
  def main(args: Array[String]) {
    val masterUrl = "local[1]"
    var dataPath = "d:\\data\\ml-1m"
    val conf = new SparkConf().setMaster(masterUrl).setAppName("PopularMovieAnalyzer")
    if (args.length > 0) {
         dataPath = args(0)
    } else {
         conf.setMaster("local[1]")
    }
    val sc = new SparkContext(conf)

    //1.加载数据,创建RDD
    val USER_AGE = "18"
    val usersRdd = sc.textFile(dataPath + "\\users.dat")
    val moviesRdd = sc.textFile(dataPath + "\\movies.dat")
    val ratingsRdd = sc.textFile(dataPath + "\\ratings.dat")
    //2.从RDD提取列
    //2.1 users: RDD[(userID, age)]
    val users = usersRdd.map(_.split("::"))
                        .map { x => (x(0), x(2)) }
                        .filter(_._2.equals(USER_AGE))
    //2.2 过滤出userID
    val userlist = users.map(_._1).collect()
    //2.3 用++来批量添加元素
    val userSet = HashSet() ++ userlist
    //2.4 广播userSet
    val broadcastUserSet = sc.broadcast(userSet)

    //3. map-side join RDDs
    val topKmovies = ratingsRdd.map(_.split("::"))
      .map { x => (x(0), x(1)) }  //RDD[UerID,MoviesID]
      .filter { x => broadcastUserSet.value.contains(x._1) } //过滤符合广播userSet
      .map { x => (x._2, 1) }  //RDD[MoviesID,1]
      .reduceByKey(_ + _)  //RDD[MoviesID,n]
      .map { x => (x._2, x._1) }//RDD[n,MoviesID]
      .sortByKey(false) //降序排序
      .map { x => (x._2, x._1) }//RDD[MoviesID,n]
      .take(10)
    //4. 将filmID转换fileName
    //4.1 过滤出RDD[MovieID,Title]
    val movieID2Name = moviesRdd.map(_.split("::"))
      .map { x => (x(0), x(1)) }
      .collect()
      .toMap
    //4.2  getOrElse(key,default)获取key对应的value,如果不存在则返回一个默认值。
    topKmovies.map(x => (movieID2Name.getOrElse(x._1, null), x._2))
              .foreach(println)

    sc.stop()
  }
}

(2)语法说明

  • 广播变量 高效分发大数据集,每个executor一份,该executor下的每个task共享,而不是每个task一份。
  • Map-side Join Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。
  • sortByKey() 将 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。

(3)输出结果

(American Beauty (1999),715)
(Star Wars: Episode VI - Return of the Jedi (1983),586)
(Star Wars: Episode V - The Empire Strikes Back (1980),579)
(Matrix, The (1999),567)
(Star Wars: Episode IV - A New Hope (1977),562)
(Braveheart (1995),544)
(Saving Private Ryan (1998),543)
(Jurassic Park (1993),541)
(Terminator 2: Judgment Day (1991),529)
(Men in Black (1997),514)

10.6 问题2: 得分最高的10部电影;看过电影最好的前10人;女性看最多的10部电影;男性看过最多的10部电影。

package cn.hadron.spark.movie

import org.apache.spark._

/**
  * 得分最高的10部电影;看过电影最多的前10个人;女性看多最多的10部电影;男性看过最多的10部电影
  */
object TopKMovieAnalyzer {

  def main(args: Array[String]) {
    var dataPath =  "d:\\data\\ml-1m"
    val conf = new SparkConf().setAppName("TopKMovieAnalyzer")
    if(args.length > 0) {
      dataPath = args(0)
    } else {
      conf.setMaster("local[1]")
    }
    val sc = new SparkContext(conf)

    /**
      * Step 1: Create RDDs
      */
    val DATA_PATH = dataPath
    val ratingsRdd = sc.textFile(DATA_PATH + "\\ratings.dat")

    /**
      * Step 2: Extract columns from RDDs
      */
    //users: RDD[(userID, movieID, Rating)]
    val ratings = ratingsRdd.map(_.split("::"))
                            .map { x =>(x(0), x(1), x(2))}
                            .cache

    /**
      * Step 3: analyze result
      * reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,
      * 因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对
      */
    //得分最高的10部电影
    ratings.map { x =>(x._2, (x._3.toInt, 1))} //RDD[movieID,(Rating,1)]
           .reduceByKey { (v1, v2) =>(v1._1 + v2._1, v1._2 + v2._2)}//RDD[movieID,(n_Rating,n)]
           .map { x =>(x._2._1.toFloat / x._2._2.toFloat, x._1)}//RDD[avgRating,movieID]
           .sortByKey(false)
           .take(10)
           .foreach(println)

    //看过电影最多的前10个人
    ratings.map { x =>(x._1, 1)}//RDD[userID,1]
           .reduceByKey(_ + _)  //RDD[userID,n]
           .map(x => (x._2, x._1))//RDD[n,userID]
           .sortByKey(false) 
           .take(10)
           .foreach(println)

    sc.stop()
  }
}

运行结果:

(5.0,787)
(5.0,3382)
(5.0,3607)
(5.0,989)
(5.0,3656)
(5.0,3881)
(5.0,1830)
(5.0,3280)
(5.0,3233)
(5.0,3172)
(2314,4169)
(1850,1680)
(1743,4277)
(1595,1941)
(1521,1181)
(1518,889)
(1344,3618)
(1323,2063)
(1302,1150)
(1286,1015)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏龙首琴剑庐

Java基准测试利器OpenJDK-JMH

3709
来自专栏小灰灰

Batik渲染png图片异常的bug修复全程记录

batik是apache的一个开源项目,可以实现svg的渲染,后端借助它可以比较简单的实现图片渲染,当然和java一贯处理图片不太方便一样,使用起来也有不少坑

2237
来自专栏xingoo, 一个梦想做发明家的程序员

Spark Stage切分 源码剖析——DAGScheduler

Spark中的任务管理是很重要的内容,可以说想要理解Spark的计算流程,就必须对它的任务的切分有一定的了解。不然你就看不懂Spark UI,看不懂Spark...

1948
来自专栏221-B

不懂汇编,如何逆向(iOS)

其实这个是非必要项, 自己手动砸壳需要已越狱的手机. 想手动砸壳可以参考这篇文章.

922
来自专栏Spark生态圈

[spark] DAGScheduler划分stage源码解析

Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG...

442
来自专栏GuZhenYin

用SignalR 2.0开发客服系统[系列2:实现聊天室]

前言 交流群:195866844 上周发表了 用SignalR 2.0开发客服系统[系列1:实现群发通讯] 这篇文章,得到了很多帮助和鼓励,小弟在此真心的感谢大...

3458
来自专栏机器人网

中英文对照,瞬间理解西门子PLC指令

指令( 英文全称意思 ) :指令含义 1、LD ( Load 装载 ) :动合触点 2、LDN ( Load Not 不装载 ) : 动断触点 3、A ( A...

3317
来自专栏Spark生态圈

[spark] Checkpoint 源码解析

在spark应用程序中,常常会遇到运算量很大经过很复杂的 Transformation才能得到的RDD即Lineage链较长、宽依赖的RDD,此时我们可以考虑将...

792
来自专栏伦少的博客

利用Spark实现Oracle到Hive的历史数据同步

和上一篇文章Spark通过修改DataFrame的schema给表字段添加注释一样,通过Spark将关系型数据库(以Oracle为例)的表同步的Hive,这里讲...

563
来自专栏Jerry的SAP技术分享

使用ABAP正则表达式解析HTML标签

需求就是我用ABAP的某个函数从数据库读取一个字符串出来,该字符串的内容是一个网页。

852

扫码关注云+社区