摘 要
基于Scala Acotor实现多线程单词统计(WordCount)
package com.itunic.scala import scala.io.Source import scala.actors.{Actor, Future} import scala.collection.mutable /** * Created by itunic.com on 2016/12/9. */ class WordCountActor extends Actor { override def act(): Unit = { loop { react { case SubmitTask(fileName) => { //Map(tom -> 1, jm -> 2, 666 -> 2, hello -> 3) //每个线程单独计算一次 val result = Source.fromFile(fileName).getLines().flatMap(_.split("\t")).map((_, 1)).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) sender ! ResultTask(result) } case StopTask => { exit() } } } } } //提交任务样例类 case class SubmitTask(fileName: String) //返回结果样例类 case class ResultTask(result: Map[String, Int]) //结束任务样例类 case object StopTask object WordCountActor { def main(args: Array[String]): Unit = { //存放返回结果集 val resultSet = new mutable.HashSet[Future[Any]]() //汇总结果集 val resultList = new mutable.ListBuffer[ResultTask] //文件池 val files = Array("F:\\test.txt", "F:\\test1.txt", "F:\\test.txt", "F:\\test1.txt") //循环读取并启动线程 for (i <- files) { val actor = new WordCountActor //启动并异步接收结果 val result = actor.start() !! SubmitTask(i) resultSet += result } //合并结果集 while (resultSet.size > 0) { val toCumpute = resultSet.filter(_.isSet) //println(toCumpute.toBuffer) for (i <- toCumpute) { val res = i.apply().asInstanceOf[ResultTask] resultList += res resultSet.remove(i) } //防止死鎖休眠100毫秒 Thread.sleep(100) } //计算最终结果 val count = resultList.flatMap(_.result).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) println(count) } }
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句