前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark实现WordCount的几种方式总结

Spark实现WordCount的几种方式总结

作者头像
大数据真好玩
发布2020-07-30 15:27:03
1.2K0
发布2020-07-30 15:27:03
举报
文章被收录于专栏:暴走大数据

方法一:map + reduceByKey

代码语言:javascript
复制
package com.cw.bigdata.spark.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount1 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount1")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
  }
}

方法二:使用countByValue代替map + reduceByKey

代码语言:javascript
复制
package com.cw.bigdata.spark.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount2 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount2")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    lines.flatMap(_.split(" ")).countByValue().foreach(println)

  }
}

方法三:aggregateByKey或者foldByKey

代码语言:javascript
复制
package com.cw.bigdata.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * WordCount实现第三种方式:aggregateByKey或者foldByKey
  *
  * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  *   1.zeroValue:给每一个分区中的每一个key一个初始值;
  *   2.seqOp:函数用于在每一个分区中用初始值逐步迭代value;(分区内聚合函数)
  *   3.combOp:函数用于合并每个分区中的结果。(分区间聚合函数)
  *
  *  foldByKey相当于aggregateByKey的简化操作,seqop和combop相同
  */
object WordCount3 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount3")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    lines.flatMap(_.split(" ")).map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
    
    lines.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)

  }
}

方法四:groupByKey+map

代码语言:javascript
复制
package com.cw.bigdata.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * WordCount实现的第四种方式:groupByKey+map
  */
object WordCount4 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount4")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    val groupByKeyRDD: RDD[(String, Iterable[Int])] = lines.flatMap(_.split(" ")).map((_, 1)).groupByKey()

    groupByKeyRDD.map(tuple => {
      (tuple._1, tuple._2.sum)
    }).collect().foreach(println)

  }
}

方法五:Scala原生实现wordcount

代码语言:javascript
复制
package com.cw.bigdata.spark.wordcount


/**
  * Scala原生实现wordcount
  */
object WordCount5 {
  def main(args: Array[String]): Unit = {

    val list = List("cw is cool", "wc is beautiful", "andy is beautiful", "mike is cool")
    /**
      * 第一步,将list中的元素按照分隔符这里是空格拆分,然后展开
      * 先map(_.split(" "))将每一个元素按照空格拆分
      * 然后flatten展开
      * flatmap即为上面两个步骤的整合
      */
    val res0 = list.map(_.split(" ")).flatten
    val res1 = list.flatMap(_.split(" "))

    println("第一步结果")
    println(res0)
    println(res1)

    /**
      * 第二步是将拆分后得到的每个单词生成一个元组
      * k是单词名称,v任意字符即可这里是1
      */
    val res3 = res1.map((_, 1))
    println("第二步结果")
    println(res3)
    /**
      * 第三步是根据相同的key合并
      */
    val res4 = res3.groupBy(_._1)
    println("第三步结果")
    println(res4)
    /**
      * 最后一步是求出groupBy后的每个key对应的value的size大小,即单词出现的个数
      */
    val res5 = res4.mapValues(_.size)
    println("最后一步结果")
    println(res5.toBuffer)
  }
}

方法六:combineByKey

代码语言:javascript
复制
package com.cw.bigdata.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * WordCount实现的第六种方式:combineByKey
  */
object WordCount6 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("combineByKey")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    val mapRDD: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))

    // combineByKey实现wordcount
    mapRDD.combineByKey(
      x => x,
      (x: Int, y: Int) => x + y,
      (x: Int, y: Int) => x + y
    ).collect().foreach(println)

  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 方法一:map + reduceByKey
  • 方法二:使用countByValue代替map + reduceByKey
  • 方法三:aggregateByKey或者foldByKey
  • 方法四:groupByKey+map
  • 方法五:Scala原生实现wordcount
  • 方法六:combineByKey
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档