Spark2.x学习笔记:10、简易电影受众系统

10、 简易电影受众系统

本章内容,源码参考了https://github.com/XichengDong/simplespark

10.1 数据准备

(1)下载数据 https://grouplens.org/datasets/movielens/ https://grouplens.org/datasets/movielens/1m/ 单击ml-1m.zip链接即可下载

(2)上传到HDFS

[root@node1 data]# hdfs dfs -put ml-1m/ input
[root@node1 data]# hdfs dfs -ls input/ml-1m
Found 4 items
-rw-r--r--   3 root supergroup       5577 2017-09-24 04:08 input/ml-1m/README
-rw-r--r--   3 root supergroup     171308 2017-09-24 04:08 input/ml-1m/movies.dat
-rw-r--r--   3 root supergroup   24594131 2017-09-24 04:08 input/ml-1m/ratings.dat
-rw-r--r--   3 root supergroup     134368 2017-09-24 04:08 input/ml-1m/users.dat
[root@node1 data]#

10.2 数据格式

(1)users.dat

UserID::Gender::Age:Occupation:Zip-code

用户信息表结构 用户号::性别:年龄:职业:邮政编码

(2)movies.dat

MovieID::Title::Genres

电影信息表结构 电影号::标题::流派

(3)ratings.dat

UerID::MoviesID::Rating::Timestamp

评级表结构 UerID:: MoviesID::评级::时间戳

10.3 加载数据

[root@node1 ~]# spark-shell
17/09/24 04:37:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/24 04:37:55 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.80.131:4040
Spark context available as 'sc' (master = local[*], app id = local-1506242261932).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val usersRdd =sc.textFile("input/ml-1m/users.dat")
usersRdd: org.apache.spark.rdd.RDD[String] = input/ml-1m/users.dat MapPartitionsRDD[3] at textFile at <console>:24

scala> usersRdd.first
res7: String = 1::F::1::10::48067

scala> usersRdd.count
res2: Long = 6040

scala> val moviesRdd=sc.textFile("input/ml-1m/movies.dat")
moviesRdd: org.apache.spark.rdd.RDD[String] = input/ml-1m/movies.dat MapPartitionsRDD[18] at textFile at <console>:24

scala> moviesRdd.first
res7: String = 1::Toy Story (1995)::Animation|Children's|Comedy

scala> moviesRdd.count
res8: Long = 3883


scala> val ratingsRdd=sc.textFile("input/ml-1m/ratings.dat")
ratingsRdd: org.apache.spark.rdd.RDD[String] = input/ml-1m/ratings.dat MapPartitionsRDD[7] at textFile at <console>:24

scala> ratingsRdd.first
res6: String = 1::1193::5::978300760

scala> ratingsRdd.count
res4: Long = 1000209

scala>

10.4 问题1:看过《Lord of the Rings,The(1978)》用户年龄和性别分布

(1)首先,为了避免三表查询,这里我们需要提前确定电影《Lord of the Rings,The(1978)》的编号。 通过movies.dat文件查询可知,该电影编号是2116。 这样可以定义一个常量:

scala> val MOVIE_ID="2116"
MOVIE_ID: String = 2116

(2)对于用户表,我们只需要年龄和性别,用户ID用于关联。所以对于用户表,需要过滤出前三个字段即可,用户ID可以作为Key,年龄和性别可以作为Value。

scala> val users=usersRdd.map(_.split("::")).map{x => (x(0),(x(1),x(2)))}
users: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[9] at map at <console>:26

scala> users.take(10)
res5: Array[(String, (String, String))] = Array((1,(F,1)), (2,(M,56)), (3,(M,25)), (4,(M,45)), (5,(M,25)), (6,(F,50)), (7,(M,35)), (8,(M,25)), (9,(M,25)), (10,(F,35)))

(3)对于评级表ratings,需要过滤出用户ID和电影ID即可,然后再通过常量MOVIE_ID=”2116”进行过滤。

scala> val rating =ratingsRdd.map(_.split("::"))
rating: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:26

scala> rating.first
res9: Array[String] = Array(1, 1193, 5, 978300760)

scala> val userMovie=rating.map{x=>(x(0),x(1))}.filter(_._2.equals(MOVIE_ID))
userMovie: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[12] at filter at <console>:30

scala> userMovie.first
res0: (String, String) = (17,2116)

scala> 

备注:对于Key/Value型的RDD,_._1表示key,_._2表示value。

(4)将处理后的评级表和处理后的用户表进行join操作。注意,rdd1[key,value1] join rdd2[key,value2]的结果是[key,(value1,value2)],也就是key是关联字段,value是两个RDD组合形式。

scala> val userRating =userMovie.join(users)
userRating: org.apache.spark.rdd.RDD[(String, (String, (String, String)))] = MapPartitionsRDD[11] at join at <console>:36

scala> userRating.first
res3: (String, (String, (String, String))) = (749,(2116,(M,35)))

(5)对连接结果进行处理。

scala> val userDistribution=userRating.map{x=>(x._2._2,1)}.reduceByKey(_+_)
userDistribution: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[13] at reduceByKey at <console>:38

scala> userDistribution.foreach(println)
((F,50),3)
((F,45),3)
((M,56),8)
((M,50),22)
((F,35),13)
((F,18),9)
((M,18),72)
((M,25),169)
((M,1),13)
((M,35),66)
((F,25),28)
((F,56),2)
((F,1),4)
((M,45),26)

备注,,x._2._2表示Key/Value型RDD每项x的value的value(第2项的第2项)。 (6)Scala程序

package cn.hadron.spark.movie
import org.apache.spark._

/**
 * 看过“Lord of the Rings, The (1978)”用户和年龄性别分布
 */
object MovieUserAnalyzer {

  def main(args: Array[String]) {
    if (args.length < 1){
      println("Usage:MovieUserAnalyzer dataPath")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("MovieUserAnalyzer")
    val sc = new SparkContext(conf)
    //1.加载数据,创建RDD
    val MOVIE_ID = "2116"
    val usersRdd = sc.textFile(args(0) + "/users.dat")
    val ratingsRdd = sc.textFile(args(0) + "/ratings.dat")

    //2.解析用户表 RDD[(userID, (gender, age))]
    val users = usersRdd.map(_.split("::")).map { x =>
      (x(0), (x(1), x(2)))
    }

    //3.解析评级表 RDD[Array(userID, movieID, ratings, timestamp)]
    val rating = ratingsRdd.map(_.split("::"))
    //usermovie: RDD[(userID, movieID)]
    val usermovie = rating.map { x =>(x(0), x(1))}.filter(_._2.equals(MOVIE_ID))

    //4.join RDDs
    //useRating: RDD[(userID, (movieID, (gender, age))]
    val userRating = usermovie.join(users)
    //movieuser: RDD[(movieID, (movieTile, (gender, age))]
    val userDistribution = userRating.map { x =>(x._2._2, 1)}.reduceByKey(_ + _)
    //5.输出结果
    userDistribution.collect.foreach(println)

    sc.stop()
  }
}

10.5 问题2:年龄段在“18-24”的男人,最喜欢看10部电影

(1)程序代码

package cn.hadron.spark.movie
import org.apache.spark._
import scala.collection.immutable.HashSet
/**
  * .年龄段在“18-24”的男人,最喜欢看10部电影
  */
object PopularMovieAnalyzer {
  def main(args: Array[String]) {
    val masterUrl = "local[1]"
    var dataPath = "d:\\data\\ml-1m"
    val conf = new SparkConf().setMaster(masterUrl).setAppName("PopularMovieAnalyzer")
    if (args.length > 0) {
         dataPath = args(0)
    } else {
         conf.setMaster("local[1]")
    }
    val sc = new SparkContext(conf)

    //1.加载数据,创建RDD
    val USER_AGE = "18"
    val usersRdd = sc.textFile(dataPath + "\\users.dat")
    val moviesRdd = sc.textFile(dataPath + "\\movies.dat")
    val ratingsRdd = sc.textFile(dataPath + "\\ratings.dat")
    //2.从RDD提取列
    //2.1 users: RDD[(userID, age)]
    val users = usersRdd.map(_.split("::"))
                        .map { x => (x(0), x(2)) }
                        .filter(_._2.equals(USER_AGE))
    //2.2 过滤出userID
    val userlist = users.map(_._1).collect()
    //2.3 用++来批量添加元素
    val userSet = HashSet() ++ userlist
    //2.4 广播userSet
    val broadcastUserSet = sc.broadcast(userSet)

    //3. map-side join RDDs
    val topKmovies = ratingsRdd.map(_.split("::"))
      .map { x => (x(0), x(1)) }  //RDD[UerID,MoviesID]
      .filter { x => broadcastUserSet.value.contains(x._1) } //过滤符合广播userSet
      .map { x => (x._2, 1) }  //RDD[MoviesID,1]
      .reduceByKey(_ + _)  //RDD[MoviesID,n]
      .map { x => (x._2, x._1) }//RDD[n,MoviesID]
      .sortByKey(false) //降序排序
      .map { x => (x._2, x._1) }//RDD[MoviesID,n]
      .take(10)
    //4. 将filmID转换fileName
    //4.1 过滤出RDD[MovieID,Title]
    val movieID2Name = moviesRdd.map(_.split("::"))
      .map { x => (x(0), x(1)) }
      .collect()
      .toMap
    //4.2  getOrElse(key,default)获取key对应的value,如果不存在则返回一个默认值。
    topKmovies.map(x => (movieID2Name.getOrElse(x._1, null), x._2))
              .foreach(println)

    sc.stop()
  }
}

(2)语法说明

  • 广播变量 高效分发大数据集,每个executor一份,该executor下的每个task共享,而不是每个task一份。
  • Map-side Join Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。
  • sortByKey() 将 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。

(3)输出结果

(American Beauty (1999),715)
(Star Wars: Episode VI - Return of the Jedi (1983),586)
(Star Wars: Episode V - The Empire Strikes Back (1980),579)
(Matrix, The (1999),567)
(Star Wars: Episode IV - A New Hope (1977),562)
(Braveheart (1995),544)
(Saving Private Ryan (1998),543)
(Jurassic Park (1993),541)
(Terminator 2: Judgment Day (1991),529)
(Men in Black (1997),514)

10.6 问题2: 得分最高的10部电影;看过电影最好的前10人;女性看最多的10部电影;男性看过最多的10部电影。

package cn.hadron.spark.movie

import org.apache.spark._

/**
  * 得分最高的10部电影;看过电影最多的前10个人;女性看多最多的10部电影;男性看过最多的10部电影
  */
object TopKMovieAnalyzer {

  def main(args: Array[String]) {
    var dataPath =  "d:\\data\\ml-1m"
    val conf = new SparkConf().setAppName("TopKMovieAnalyzer")
    if(args.length > 0) {
      dataPath = args(0)
    } else {
      conf.setMaster("local[1]")
    }
    val sc = new SparkContext(conf)

    /**
      * Step 1: Create RDDs
      */
    val DATA_PATH = dataPath
    val ratingsRdd = sc.textFile(DATA_PATH + "\\ratings.dat")

    /**
      * Step 2: Extract columns from RDDs
      */
    //users: RDD[(userID, movieID, Rating)]
    val ratings = ratingsRdd.map(_.split("::"))
                            .map { x =>(x(0), x(1), x(2))}
                            .cache

    /**
      * Step 3: analyze result
      * reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,
      * 因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对
      */
    //得分最高的10部电影
    ratings.map { x =>(x._2, (x._3.toInt, 1))} //RDD[movieID,(Rating,1)]
           .reduceByKey { (v1, v2) =>(v1._1 + v2._1, v1._2 + v2._2)}//RDD[movieID,(n_Rating,n)]
           .map { x =>(x._2._1.toFloat / x._2._2.toFloat, x._1)}//RDD[avgRating,movieID]
           .sortByKey(false)
           .take(10)
           .foreach(println)

    //看过电影最多的前10个人
    ratings.map { x =>(x._1, 1)}//RDD[userID,1]
           .reduceByKey(_ + _)  //RDD[userID,n]
           .map(x => (x._2, x._1))//RDD[n,userID]
           .sortByKey(false) 
           .take(10)
           .foreach(println)

    sc.stop()
  }
}

运行结果:

(5.0,787)
(5.0,3382)
(5.0,3607)
(5.0,989)
(5.0,3656)
(5.0,3881)
(5.0,1830)
(5.0,3280)
(5.0,3233)
(5.0,3172)
(2314,4169)
(1850,1680)
(1743,4277)
(1595,1941)
(1521,1181)
(1518,889)
(1344,3618)
(1323,2063)
(1302,1150)
(1286,1015)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小樱的经验随笔

详解斯坦纳点及斯坦纳树及模版归纳总结

①什么是斯坦纳点?   假设原来已经给定了个点,库朗等指出需要引进的点数至多为,此种点称为斯坦纳点。过每一斯坦纳点,至多有三条边通过。若为三条边,则它们两两交成...

4766
来自专栏xingoo, 一个梦想做发明家的程序员

Spark源码分析之分区器的作用

最近因为手抖,在Spark中给自己挖了一个数据倾斜的坑。为了解决这个问题,顺便研究了下Spark分区器的原理,趁着周末加班总结一下~ 先说说数据倾斜 数据...

21010
来自专栏漫漫深度学习路

tensorflow学习笔记(二十六):构建TF代码

如何构建TF代码 batch_size: batch的大小 mini_batch: 将训练样本以batch_size分组 epoch_size: 样本分...

2085
来自专栏aCloudDeveloper

旋转字符串算法由浅入深

Author:bakari     Date:2012.9.8 昨天在写一个旋转字符串的函数时,写着写着发现有好多种方法,最简单的莫过于替换然后覆盖再插入。不要...

1777
来自专栏mathor

Manacher算法

 求最大回文子串的长度一般要看原串的长度是奇数还是偶数,然后分别求得,但Manacher算法的第一个神奇之处就是把两种字符串都化为长度为奇数,从而简化计算:

641
来自专栏Deep learning进阶路

OpenCV 学习日记(三)--- 常见数据类型

OpenCV基本数据类型: CvPoint,这些结构中最简单的一个,包含两个整型变量x和y。 CvPoint还有两个兄弟:CvPoint2D32f 和 CvPo...

2080
来自专栏Spark学习技巧

SparkMLlib的数据类型讲解

SparkMLlib的数据类型讲解 Mllib支持单机上存储的本地向量和矩阵,也支持由一个或者多个RDD支持的分布式矩阵。本地向量和本地矩阵是简单的数据模型,用...

2547
来自专栏人工智能LeadAI

数据分析中的可视化-常见图形

import matplotlib.pyplot as plt import pandas as pd from pandas import Series, D...

1052
来自专栏成长道路

文本型数据的向量化:TF-IDF

1.对于文本型数据的分类处理(或者其他的处理),根据ik和jcseg等分词器先对它们进行分词处理之后,大家都知道,计算机是处理不了汉字的,对于文本型的词我们如何...

2170
来自专栏数据科学与人工智能

Python语言做数据探索教程

本文总结Python语言做数据探索的知识。 类似R语言做数据探索,利用Python语言做数据探索。 1 数据导入 2 数据类型变换 3 数据集变换 4 数据排序...

3525

扫码关注云+社区