专栏首页浪淘沙spark求最受欢迎的老师的问题

spark求最受欢迎的老师的问题

文件内容:

http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/zhangsan
http://bigdata.edu360.cn/lisi
http://bigdata.edu360.cn/lisi

1.求最受欢迎的老师,不考虑课程类别(然后类似于wordCount)

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacher {

 
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    val reduced = word.reduceByKey(_+_)

    val sorted = reduced.sortBy(_._2,false)
    val list = sorted.take(3)
    println(list.toBuffer)

  }
}



//运行结果
//ArrayBuffer(((bigdata,lisi),15), ((javaee,laoyang),9), ((javaee,zhaoliu),6))

2.求每个学科最受欢迎的老师

  根据学科分组然后排序

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FavTeacher {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    val reduced = word.reduceByKey(_+_)

   // val sorted = reduced.sortBy(_._2,false)
    //分组
    val grouped = reduced.groupBy(_._1._1)
    //排序 取前两名 取到的数据是scala中进行排序的
    //先分组 然后在组内进行排序 这里的ComoactBuffer是迭代器,继承了序列,然后迭代器转换成List进行排序
    //在某种极端情况下,_表示迭代分区的数据,证明这里是将迭代器的数据一次性的来过来后进行toList,如果数据量非常大,这里肯定会出现OOM(内存溢出)
    val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(-_._2).take(2))



    //释放资源
    sc.stop()
  }
}

//运行结果

//  (javaee,List(((javaee,laoyang),9), ((javaee,zhaoliu),6)))
//  (python,List(((python,laoli),3), ((python,laoliu),1)))
//  (bigdata,List(((bigdata,lisi),15), ((bigdata,wangwu),6)))

3.求各科最受欢迎的两名老师

创建一个数组  将不同的学科放在不同的RDD中 然后排序,取值

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * 根据学科取得的最受欢迎的前2名老师的排序
  */
object FavTeacher2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val subjects = Array("javaee","bigdata","python")
    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    //处理数据
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    //聚合
    val reduced = word.reduceByKey(_+_)

    // val sorted = reduced.sortBy(_._2,false)
    //分组
    // val grouped = reduced.groupBy(_._1._1)
    //先将学科进行过滤,一个学科的数据放到一个RDD中
    for(sb <- subjects){
      //对所有数据进行过滤
      val filtered = reduced.filter(_._1._1 == sb)
      //在一个学科中进行排序(RDD排序是内存+磁盘)
      val sorted = filtered.sortBy(_._2,false).take(2)
      println(sorted.toBuffer)
    }

  }
}

//运行结果
ArrayBuffer(((javaee,laoyang),9), ((javaee,zhaoliu),6))
ArrayBuffer(((bigdata,lisi),15), ((bigdata,wangwu),6))
ArrayBuffer(((python,laoli),3), ((python,laoliu),1))

4.求各科最受欢迎的两名老师

  自定义分区器 将相同科目的老师放到同一个分区

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object FavTeacher3 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("Favteacher").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("d:/data/teacher.log")
    //处理数据
    val word: RDD[((String, String), Int)] = lines.map(line => {
      val teacher = line.substring(line.lastIndexOf("/")+1)
      val url = new URL(line).getHost
      val subject = url.substring(0,url.indexOf("."))
      ((subject,teacher),1)
    })
    //聚合
    val reduced = word.reduceByKey(_+_)

    //先计算学科的数量
    //将所有学科的名字先在集群中统计计算,然后收集回来(计算有几个学科 创建几个分区)
    val subject: Array[String] = reduced.map(_._1._1).distinct().collect()

    //创建一个自定义分区器,按照学科进行分区, 相同学科的数据都shuffle到一个分区
    val subjectPartitiioner = new SubjectPartitioner(subject)

    //对聚合后的RDD进行自定义分区
    val sbPartitioner = reduced.partitionBy(subjectPartitiioner)
    //重新分区后,在每个分区中进行排序
    val sorted =
    sbPartitioner.mapPartitions(_.toList.sortBy(- _._2).iterator)
    sorted.saveAsTextFile("d:/data/out/teacher")
  }
}

//自定义分区器
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
  //在new的时候执行,在构造器中执行
  //String是分区(学科),Int 是学科的位置
  val rules = new mutable.HashMap[String,Int]()

  var index = 0
  //初始化一个规则
  for(sb <- subjects){
    rules += ((sb,index))
    index += 1
  }
  //有几个学科返回几个区
  //返回分区的数量
  override def numPartitions: Int = subjects.length
  //根据传入的key,计算返回分区的编号
  //定义一个 计算规则
  override def getPartition(key: Any): Int = {
    //key是一个元组(学科,老师) 将key强制转换成元组
    val tuple = key.asInstanceOf[(String,String)]
    val subject = tuple._1
    rules(subject)
  }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark实现排序

    question: 用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序

    曼路
  • SparkSql学习笔记一

    1.简介     Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。 ...

    曼路
  • SparkCore 编程

    2.创建一个数组,根据数据创建一个Bean对象,继承Order,实现序列化(Serializable).从而对数组进行排序。

    曼路
  • golang 常见变成问题01

    往 chan 中放数据时,如果缓冲区已经满那么将 block 以下方方式可以试探往 chan 放数据

    landv
  • 求一个数的临近的较大的2的整数次幂

    在改进一下,就判断他是不是2的次方先。如果是的话,可以直接返回。就可以得到这种方法。面试官又说,不能用循环递归,函数库。这下麻烦了。

    forxtz
  • spark过节监控告警系统实现

    马上要过年了,大部分公司这个时候都不会再去谋求开新业务,而大数据工匠们,想要过好年,就要保证过年期间自己对自己的应用了如执掌。一般公司都会有轮值人员,至少要有春...

    Spark学习技巧
  • 使用AES进行文件加密算法

    Xiaolei123
  • Spark实现排序

    question: 用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序

    曼路
  • 【技术分享】随机森林分类

    Bagging采用自助采样法(bootstrap sampling)采样数据。给定包含m个样本的数据集,我们先随机取出一个样本放入采样集中,再把该样本放回初始...

    腾讯智能钛AI开发者
  • 基于 Kotlin + Netty 实现一个简单的 TCP 自定义协议

    我们的项目需要开发一款智能硬件。它由 Web 后台发送指令到一款桌面端应用程序,再由桌面程序来控制不同的硬件设备实现业务上的操作。从 Web 后台到桌面端是通过...

    fengzhizi715

扫码关注云+社区

领取腾讯云代金券