基于Scala Acotor实现多线程单词统计(WordCount)

摘 要

基于Scala Acotor实现多线程单词统计(WordCount)

package com.itunic.scala

import scala.io.Source
import scala.actors.{Actor, Future}
import scala.collection.mutable

/**
  * Created by itunic.com on 2016/12/9.
  */
class WordCountActor extends Actor {
  override def act(): Unit = {
    loop {
      react {
 case SubmitTask(fileName) => {
 //Map(tom -> 1, jm -> 2, 666 -> 2, hello -> 3)
 //每个线程单独计算一次
          val result = Source.fromFile(fileName).getLines().flatMap(_.split("\t")).map((_, 1)).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
          sender ! ResultTask(result)
        }
 case StopTask => {
          exit()
        }

      }
    }
  }
}

//提交任务样例类
case class SubmitTask(fileName: String)

//返回结果样例类
case class ResultTask(result: Map[String, Int])

//结束任务样例类
case object StopTask

object WordCountActor {
  def main(args: Array[String]): Unit = {
 //存放返回结果集
    val resultSet = new mutable.HashSet[Future[Any]]()
 //汇总结果集
    val resultList = new mutable.ListBuffer[ResultTask]
 //文件池
    val files = Array("F:\\test.txt", "F:\\test1.txt", "F:\\test.txt", "F:\\test1.txt")
 //循环读取并启动线程
 for (i <- files) {
      val actor = new WordCountActor
 //启动并异步接收结果
      val result = actor.start() !! SubmitTask(i)
      resultSet += result
    }

 //合并结果集
 while (resultSet.size > 0) {
      val toCumpute = resultSet.filter(_.isSet)
 //println(toCumpute.toBuffer)
 for (i <- toCumpute) {
        val res = i.apply().asInstanceOf[ResultTask]
        resultList += res
        resultSet.remove(i)
      }
 //防止死鎖休眠100毫秒
      Thread.sleep(100)

    }
 //计算最终结果
    val count = resultList.flatMap(_.result).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
    println(count)

  }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏高爽的专栏

POI读取Excel常见问题

       最近在做一个将excel导入到报表中的功能,使用了POI来实现,发现POI使用有诸多不便之处,先记录下来,以后可能考虑使用Openxml。    ...

2520
来自专栏技术沉淀

Ruby练习三

1105
来自专栏葡萄城控件技术团队

深入浅出OOP(五): C#访问修饰符(Public/Private/Protected/Internal/Sealed/Constants)

访问修饰符(或者叫访问控制符)是面向对象语言的特性之一,用于对类、类成员函数、类成员变量进行访问控制。同时,访问控制符也是语法保留关键字,用于封装组件。 Pub...

3019
来自专栏码匠的流水账

聊聊sentinel的ModifyRulesCommandHandler

本文主要研究一下sentinel的ModifyRulesCommandHandler

941
来自专栏小樱的经验随笔

POJ 2492 A Bug's Life

A Bug's Life Time Limit: 10000MS Memory Limit: 65536K Total Submissions:...

28010
来自专栏小樱的经验随笔

BZOJ 2456: mode(新生必做的水题)

2456: mode Time Limit: 1 Sec  Memory Limit: 1 MB Submit: 4868  Solved: 2039 Des...

2626
来自专栏码匠的流水账

聊聊storm的reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

1213
来自专栏IT笔记

SpringBoot开发案例之整合Spring-data-jpa进阶篇

有人说 从 jdbc->jdbctemplate->hibernation/mybatis 再到 jpa,真当开发人员的学习时间不要钱?我觉得到 h/m 这一级...

3406
来自专栏码匠的流水账

聊聊spring cloud gateway的GatewayFilter

本文主要研究一下spring cloud gateway的GatewayFilter

4121
来自专栏Android知识点总结

Java总结IO之总集篇

字符流和字节流向来各行其事,很少有交集。 但Reader和Writer有两个奇子,名叫InputStreamReader(男)和OutputStreamWri...

1245

扫码关注云+社区