专栏首页浪淘沙Spark实现排序

Spark实现排序

question: 用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序

1.User类继承ordered,并且序列化

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 自定义排序
  * Created by zhangjingcun on 2018/9/27 17:13.
  */
object CustomSort1 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序
    val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

    //并行化成RDD
    val userLines: RDD[String] = sc.makeRDD(users)

    //整理数据
    val userRdd: RDD[User1] = userLines.map(line => {
      val fileds = line.split(",")
      val id = fileds(0).toLong
      val name = fileds(1)
      val fv = fileds(2).toInt
      val age = fileds(3).toInt
      new User1(id, name, fv, age)
    })

    //排序
    val sorted: RDD[User1] = userRdd.sortBy(u => u)

    //收集数据
    val result: Array[User1] = sorted.collect()

    println(result.toBuffer)

    sc.stop()
  }
}

class User1 (val id:Long, val name:String, val fv:Int, val age:Int) extends Ordered[User1] with Serializable {
  override def compare(that: User1): Int = {
    //颜值相等的时候
    if (that.fv == this.fv) {
      this.age - that.age
    } else {
      -(this.fv - that.fv)
    }
  }

  override def toString: String = {
    s"User:($id, $name, $fv, $age)"
  }
}

2.User继承Sorted 没有序列化,不需要new

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * Created by zhangjingcun on 2018/9/27 17:32.
  */
object CustomSort2 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序
    val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

    //并行化成RDD
    val userLines: RDD[String] = sc.makeRDD(users)

    //整理数据
    val userRdd: RDD[User2] = userLines.map(line => {
      val fileds = line.split(",")
      val id = fileds(0).toLong
      val name = fileds(1)
      val fv = fileds(2).toInt
      val age = fileds(3).toInt
      User2(id, name, fv, age)
    })

    //排序
    val sorted: RDD[User2] = userRdd.sortBy(u => u)

    //收集数据
    val result: Array[User2] = sorted.collect()

    println(result.toBuffer)

    sc.stop()
  }
}
//case 可以不使用new关键字
//不需要实现序列化
case class User2 (val id:Long, val name:String, val fv:Int, val age:Int) extends Ordered[User2] {
  override def compare(that: User2): Int = {
    //颜值相等的时候
    if (that.fv == this.fv) {
      this.age - that.age
    } else {
      -(this.fv - that.fv)
    }
  }

  override def toString: String = {
    s"User:($id, $name, $fv, $age)"
  }
}

3.

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * Created by zhangjingcun on 2018/9/27 17:37.
  */
object CustomSort3 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序
    val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

    //并行化成RDD
    val userLines: RDD[String] = sc.makeRDD(users)

    //整理数据
    val userRdd: RDD[(Long, String, Int, Int)] = userLines.map(line => {
      val fileds = line.split(",")
      val id = fileds(0).toLong
      val name = fileds(1)
      val fv = fileds(2).toInt
      val age = fileds(3).toInt
      (id, name, fv, age)
    })

    //排序
    val sorted: RDD[(Long, String, Int, Int)] = userRdd.sortBy(tp => User3(tp._1, tp._2, tp._3, tp._4))

    //收集数据
    val result: Array[(Long, String, Int, Int)] = sorted.collect()

    println(result.toBuffer)

    sc.stop()
  }
}
//case 可以不使用new关键字
//不需要实现序列化
case class User3 (val id:Long, val name:String, val fv:Int, val age:Int) extends Ordered[User3] {
  override def compare(that: User3): Int = {
    //颜值相等的时候
    if (that.fv == this.fv) {
      this.age - that.age
    } else {
      -(this.fv - that.fv)
    }
  }

  override def toString: String = {
    s"User:($id, $name, $fv, $age)"
  }
}

4.

package cn.edu360.spark.day06

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * Created by zhangjingcun on 2018/9/27 17:41.
  */
object CustomSort4 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序
    val users: Array[String] = Array("1,tom,99,34", "2,marry,96,26", "3,mike,98,29", "4,jim,96,30")

    //并行化成RDD
    val userLines: RDD[String] = sc.makeRDD(users)

    //整理数据
    val tpRdd: RDD[(Long, String, Int, Int)] = userLines.map(line => {
      val fileds = line.split(",")
      val id = fileds(0).toLong
      val name = fileds(1)
      val fv = fileds(2).toInt
      val age = fileds(3).toInt
      (id, name, fv, age)
    })

    //利用元祖的比较特点:先比较第一个,如果不相等,按照第一个属性排序,在比较下个属性
    implicit val rules = Ordering[(Int, Int)].on[(Long, String, Int, Int)](t => (-t._3, t._4))

    val sorted = tpRdd.sortBy(t => t)
    //收集数据
    val result: Array[(Long, String, Int, Int)] = sorted.collect()

    println(result.toBuffer)

    sc.stop()
  }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • spark求最受欢迎的老师的问题

    曼路
  • SparkCore 编程

    2.创建一个数组,根据数据创建一个Bean对象,继承Order,实现序列化(Serializable).从而对数组进行排序。

    曼路
  • SparkSql学习笔记一

    1.简介     Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。 ...

    曼路
  • spark过节监控告警系统实现

    马上要过年了,大部分公司这个时候都不会再去谋求开新业务,而大数据工匠们,想要过好年,就要保证过年期间自己对自己的应用了如执掌。一般公司都会有轮值人员,至少要有春...

    Spark学习技巧
  • 求一个数的临近的较大的2的整数次幂

    在改进一下,就判断他是不是2的次方先。如果是的话,可以直接返回。就可以得到这种方法。面试官又说,不能用循环递归,函数库。这下麻烦了。

    forxtz
  • golang 常见变成问题01

    往 chan 中放数据时,如果缓冲区已经满那么将 block 以下方方式可以试探往 chan 放数据

    landv
  • spark求最受欢迎的老师的问题

    曼路
  • 【技术分享】随机森林分类

    Bagging采用自助采样法(bootstrap sampling)采样数据。给定包含m个样本的数据集,我们先随机取出一个样本放入采样集中,再把该样本放回初始...

    腾讯智能钛AI开发者
  • spark基础练习(未完)

    1、filter val rdd = sc.parallelize(List(1,2,3,4,5)) val mappedRDD = rdd.map(2*_) ...

    用户3003813
  • 使用AES进行文件加密算法

    Xiaolei123

扫码关注云+社区

领取腾讯云代金券