基于Scala Acotor实现多线程单词统计(WordCount)

摘 要

基于Scala Acotor实现多线程单词统计(WordCount)

package com.itunic.scala

import scala.io.Source
import scala.actors.{Actor, Future}
import scala.collection.mutable

/**
  * Created by itunic.com on 2016/12/9.
  */
class WordCountActor extends Actor {
  override def act(): Unit = {
    loop {
      react {
 case SubmitTask(fileName) => {
 //Map(tom -> 1, jm -> 2, 666 -> 2, hello -> 3)
 //每个线程单独计算一次
          val result = Source.fromFile(fileName).getLines().flatMap(_.split("\t")).map((_, 1)).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
          sender ! ResultTask(result)
        }
 case StopTask => {
          exit()
        }

      }
    }
  }
}

//提交任务样例类
case class SubmitTask(fileName: String)

//返回结果样例类
case class ResultTask(result: Map[String, Int])

//结束任务样例类
case object StopTask

object WordCountActor {
  def main(args: Array[String]): Unit = {
 //存放返回结果集
    val resultSet = new mutable.HashSet[Future[Any]]()
 //汇总结果集
    val resultList = new mutable.ListBuffer[ResultTask]
 //文件池
    val files = Array("F:\\test.txt", "F:\\test1.txt", "F:\\test.txt", "F:\\test1.txt")
 //循环读取并启动线程
 for (i <- files) {
      val actor = new WordCountActor
 //启动并异步接收结果
      val result = actor.start() !! SubmitTask(i)
      resultSet += result
    }

 //合并结果集
 while (resultSet.size > 0) {
      val toCumpute = resultSet.filter(_.isSet)
 //println(toCumpute.toBuffer)
 for (i <- toCumpute) {
        val res = i.apply().asInstanceOf[ResultTask]
        resultList += res
        resultSet.remove(i)
      }
 //防止死鎖休眠100毫秒
      Thread.sleep(100)

    }
 //计算最终结果
    val count = resultList.flatMap(_.result).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
    println(count)

  }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

扫码关注云+社区