基于Scala Acotor实现多线程单词统计(WordCount)

摘 要

基于Scala Acotor实现多线程单词统计(WordCount)

package com.itunic.scala

import scala.io.Source
import scala.actors.{Actor, Future}
import scala.collection.mutable

/**
  * Created by itunic.com on 2016/12/9.
  */
class WordCountActor extends Actor {
  override def act(): Unit = {
    loop {
      react {
 case SubmitTask(fileName) => {
 //Map(tom -> 1, jm -> 2, 666 -> 2, hello -> 3)
 //每个线程单独计算一次
          val result = Source.fromFile(fileName).getLines().flatMap(_.split("\t")).map((_, 1)).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
          sender ! ResultTask(result)
        }
 case StopTask => {
          exit()
        }

      }
    }
  }
}

//提交任务样例类
case class SubmitTask(fileName: String)

//返回结果样例类
case class ResultTask(result: Map[String, Int])

//结束任务样例类
case object StopTask

object WordCountActor {
  def main(args: Array[String]): Unit = {
 //存放返回结果集
    val resultSet = new mutable.HashSet[Future[Any]]()
 //汇总结果集
    val resultList = new mutable.ListBuffer[ResultTask]
 //文件池
    val files = Array("F:\\test.txt", "F:\\test1.txt", "F:\\test.txt", "F:\\test1.txt")
 //循环读取并启动线程
 for (i <- files) {
      val actor = new WordCountActor
 //启动并异步接收结果
      val result = actor.start() !! SubmitTask(i)
      resultSet += result
    }

 //合并结果集
 while (resultSet.size > 0) {
      val toCumpute = resultSet.filter(_.isSet)
 //println(toCumpute.toBuffer)
 for (i <- toCumpute) {
        val res = i.apply().asInstanceOf[ResultTask]
        resultList += res
        resultSet.remove(i)
      }
 //防止死鎖休眠100毫秒
      Thread.sleep(100)

    }
 //计算最终结果
    val count = resultList.flatMap(_.result).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
    println(count)

  }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java 成神之路

Junit 多线测试 问题

2727
来自专栏坚毅的PHP

Hbase 源码分析之 Get 流程及rpc原理

分析版本为hbase 0.94 附上趋势团队画的图: rpc角色表: HBase通信信道 HBase的通信接口 客户端 服务端 HBase Cl...

3554
来自专栏Kevin-ZhangCG

[ SSH框架 ] Hibernate框架学习之四(JPA操作)

1947
来自专栏Java架构沉思录

优雅实现延时任务之zookeeper篇

在《优雅实现延时任务之Redis篇》一文中提到,实现延时任务的关键点,是要存储任务的描述和任务的执行时间,还要能根据任务执行时间进行排序,那么我们可不可以使用z...

1013
来自专栏阿杜的世界

RocketMQ学习-NameServer-2

上篇文章主要梳理了NameServer的启动器和配置信息,并复习了JVM中的关闭钩子这个知识点。这篇文章看下NameServer的其他模块。建议带着如下三个问题...

571
来自专栏函数式编程语言及工具

Scalaz(34)- Free :算法-Interpretation

我们说过自由数据结构(free structures)是表达数据类型的最简单结构。List[A]是个数据结构,它是生成A类型Monoid的最简单结构,因为我们...

1976
来自专栏丑胖侠

Zookeeper客户端API之读取子节点列表(八)

本篇博客介绍一下Zookeeper原生客户端API提供的获取子节点列表方法。 获取子节点列表方法 方法 Zookeeper原生客户端API提供了以下8中获取子节...

1765
来自专栏cmazxiaoma的架构师之路

Redis分布式锁解决方案

我们知道分布式锁的特性是排他、避免死锁、高可用。分布式锁的实现可以通过数据库的乐观锁(通过版本号)或者悲观锁(通过for update)、Redis的setnx...

974
来自专栏Spark学习技巧

重要 | mr使用hcatalog读写hive表

企业中,由于领导们的要求,hive中有数据存储格式很多时候是会变的,比如为了优化将tsv,csv格式改为了parquet或者orcfile。那么这个时候假如是m...

722
来自专栏博岩Java大讲堂

Java集合--ConcurrentMap

3459

扫码关注云+社区