摘 要
基于Scala Acotor实现多线程单词统计(WordCount)
package com.itunic.scala
import scala.io.Source
import scala.actors.{Actor, Future}
import scala.collection.mutable
/**
* Created by itunic.com on 2016/12/9.
*/
class WordCountActor extends Actor {
override def act(): Unit = {
loop {
react {
case SubmitTask(fileName) => {
//Map(tom -> 1, jm -> 2, 666 -> 2, hello -> 3)
//每个线程单独计算一次
val result = Source.fromFile(fileName).getLines().flatMap(_.split("\t")).map((_, 1)).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
sender ! ResultTask(result)
}
case StopTask => {
exit()
}
}
}
}
}
//提交任务样例类
case class SubmitTask(fileName: String)
//返回结果样例类
case class ResultTask(result: Map[String, Int])
//结束任务样例类
case object StopTask
object WordCountActor {
def main(args: Array[String]): Unit = {
//存放返回结果集
val resultSet = new mutable.HashSet[Future[Any]]()
//汇总结果集
val resultList = new mutable.ListBuffer[ResultTask]
//文件池
val files = Array("F:\\test.txt", "F:\\test1.txt", "F:\\test.txt", "F:\\test1.txt")
//循环读取并启动线程
for (i <- files) {
val actor = new WordCountActor
//启动并异步接收结果
val result = actor.start() !! SubmitTask(i)
resultSet += result
}
//合并结果集
while (resultSet.size > 0) {
val toCumpute = resultSet.filter(_.isSet)
//println(toCumpute.toBuffer)
for (i <- toCumpute) {
val res = i.apply().asInstanceOf[ResultTask]
resultList += res
resultSet.remove(i)
}
//防止死鎖休眠100毫秒
Thread.sleep(100)
}
//计算最终结果
val count = resultList.flatMap(_.result).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
println(count)
}
}