运用Aggregator模式实现MapReduce

《基于Actor的响应式编程》计划分为三部分,第一部分剖析响应式编程的本质思想,为大家介绍何谓响应式编程(Reactive Programming)。第二部分则结合两个案例来讲解如何在AKKA中实现响应式编程。第三部分则是这个主题的扩展,在介绍Reactive Manifesto的同时,介绍进行响应式编程更为主流的ReactiveX框架。本文是第二部分的第二个案例。

MapReduce是更好地利用并行计算资源来提升数据处理能力的重要算法,如今已被主流的大数据分析平台实现,成为了大数据批量处理的主力军。利用前面介绍的Actor特性,其实我们也可以实现一个简易的MapReduce。

利用AKKA Actor来实现MapReduce,天生就支持并行计算(利用远程Actor)与异步操作。为了简便起见,本例使用了本地的Actor实现了大数据世界的Hello World,即WordCounter。

在编写字数统计器的MapReduce之前,我们需要先分辨职责,包括:

  • 给定网页地址,获取指定网页的内容
  • 对网页内容进行分词
  • 为每个单词统计字数

考虑到本文的中心主题是介绍响应式编程与Actor模型,所以我们降低了案例难度,读取的网页内容均为英文,并简单地以空格作为分词的标志。由于我们需要接受客户端的字数统计分析请求,那么要完成前面提到的职责,至少需要四个Actor:

  • WordCounterClient:发送数据分析请求
  • WordCounterServer:模拟服务端,接收数据分析请求,并最终将统计后的结果返回给WordCounterClient
  • PageContentFetcher:获取网页内容
  • ContentWordCounter:网页内容的字数统计器

为了尽可能地提升性能,对于获取网页内容以及统计内容字数的统计工作,我们都需要多个Actor同时执行。然而,由于每个Actor处理消息都是以异步形式进行,我们该怎样才能知道并发处理的请求都得到了处理?针对字数统计器的案例而言,我们还需要将每个Actor统计获得的字数再进行reduce,同样也需要知道是否每条消息都已经处理完毕,并获得处理的结果。

AKKA通过Aggregator特性实现了Aggregator模式,可以很好地解决刚才提到的问题。它通过引入一个单独的聚合器Actor,用以聚合多个Actor产生的数据,并根据这些Actor对消息的Response更新状态。

假定ContentWordCounter分析后的结果如下代码所示:

case class AnalysisResult(wordToCount: Seq[(String, Long)])

那么,Aggregator就可以通过在其内部维持一个分析结果集(即前面所谓的状态,代码中的analysisResults),每收到一个Actor的Response,就将结果塞入到这个结果集(更新状态)中,并判断结果集的长度是否等于要处理的网页数,以此作为消息是否处理完毕的条件。

整个Aggregator的实现如下:

class WordCounterAggregator extends Actor with Aggregator { 
 expectOnce {    
 case StartAggregation(target, urls) =>      
 new Handler(target, urls, sender)    case _ =>      sender ! BadCommand      

 context stop self  }  class Handler(target: ActorRef, urls: Seq[String], originalSender: ActorRef) {  
   var analysisResults = Set.empty[AnalysisResult]   
    context.system.scheduler.scheduleOnce(10.seconds, self, Timeout)    expect {   
       case Timeout =>        respondIfDone(respondAnyway = true)    }  
         urls.foreach { uri =>      target ! FetchPageContent(uri)      
         expectOnce {        case result: AnalysisResult =>    
               analysisResults += result          respondIfDone()      }   
                }    def respondIfDone(respondAnyway: Boolean = false) = {          
                     import MapSeqImplicits._      if (respondAnyway || analysisResults.size == urls.size) {  
                           val wordToCounts = analysisResults.flatMap(_.wordToCount).reduceByKey(_ + _)        originalSender ! AggregatedAnalysisResult(wordToCounts)        context stop self      }    }  }}

WordCounterAggregator继承了Aggregator特性,这个特性已经对Actor的receive进行了处理,使得继承该特性的Actor不需要重写receive方法。Aggregator特性提供了expect、expectOnce与unexpect,用以接收期待处理的消息。

在Aggregator内部,其实维持了一个expectList,用以存放expect等函数所接收的偏函数。expect与expectOnce都是将偏函数放入到这个列表中,只是后者只留存一次(通过permanent标志来判定),一旦匹配了,就会将该偏函数移除,而expect则不会;至于unexpect,就是expect的反操作,用于将偏函数从列表中移除。

自定义的respondIfDone方法会在满足聚合条件时,对分析结果进行reduce运算。Scala的集合库自身并没有提供reduceByKey()函数,是我模仿Spark的RDD自行编写的隐式转换方法:

object MapSeqImplicits {  
 implicit class MapSeqWrapper(wordToCount: Iterable[(String, Long)]) {   
 def reduceByKey(f: (Long, Long) => Long): Seq[(String, Long)] = {     
  wordToCount.groupBy(_._1).map {       
   case (word, counts) => (word, counts.map(_._2).foldLeft(0L)(f))      }.toSeq    }  }}

因为引入了一个Aggregator,消息的处理以及Actor之间的协作就变得相对复杂。要进行响应式编程,其中一个关键就是要理清楚数据(或消息)的流动方向,并分辨每个数据处理器的职责。我们可以借助类似状态图之类的可视化工具帮助我们分析数据流动模型。下图是本例的一个消息处理模型,它同时还表达了Actor之间的协作关系。

执行字数统计的流程如下所示:

  • 首先,WordCounterClient接收StartAnalysisWebPages消息,准备分析网页;
  • 由于Client没有这个“能力”完成分析任务,于是求助于WordCounterServer,并发起FetchWebPages消息,要求获取网页内容;
  • WordCounterServer同样是个惫懒货色,什么都不干,转手就将这件事情转交给别的Actor了,所以他其实就是一个前台接待员。如果不需要聚合,它收到的FetchWebPages其实应该交给PageContentFetcher,但现在须得经由WordCounterAggregator来分配请求;所以从另外一个角度来看,这个Aggregator相当于是一个Mediator;
  • 由于Aggregator是一个Mediator,因此它会协调多个PageContentFetcher与ContentWordCounter来并行完成任务;因而Aggregator和这两个Actor之间是一对多关系,而PageContentFetcher与ContentWordCounter则属于一对一关系。当PageContentFetcher获得了网页内容后,就通过CountPageContent消息,将统计字数的职责交给了ContentWordCounter;
  • ContentWordCounter在计算完当前网页的字数后,会将分析结果AnalysisResult返回给Aggregator,并由其完成分析结果的reduce运算,并返回AggregatedAnalysisResult结果给Server;
  • 最后,Server再将Client需要的最终结果返回给Client。

由于Aggregator需要协调多个Fetcher与Counter的Actor,以支持异步并行计算(本例实则是并发计算)的需要,我为其引入了AKKA提供的Router Actor。通过Router可以创建一个容器Actor,内部管理多个worker rootees,并提供了RoundRobin、Random、Boardcast等多种路由形式,用户可以根据Actor的负载情况选择不同的路由方式。

这里,我选择使用RoundRobin以硬编码的形式创建了Router Actor:

val analyst: ActorRef = context.actorOf(Props(new ContentWordCounter(aggregator)),
 "PageContentAnalyst")val fetchers = context.actorOf(RoundRobinPool(4).props(Props(new PageContentFetcher(analyst))), "fetchers")

整体来看,PageContentFetcher与ContentWordCounter其实扮演的是map角色,并通过Router Actor来实现map工作的异步并发处理;而WordCounterAggregator则扮演了reduce角色,它负责将收到的多个分析结果进行reduce运算。

由于缺乏对MapReduce算法必要的封装,用AKKA Actor实现的MapReduce显得比较复杂,但却较好地体现了响应式编程的异步数据流本质。

当我们在使用Actor来处理异步消息传递时,当业务渐趋复杂后,我们常常会迷失在复杂的消息传递网中而无法自拔。为了保持清醒的头脑,需要时刻谨记Actor的职责。以我的经验,我们应该考虑:

  • 从Actor扮演的角色来思考它应该接收什么样的消息;
  • Actor对消息的处理一定要满足单一职责原则,正确地履行职责,也当在合适时候正确地转移职责;
  • 运用状态图帮助思考Actor与其他Actor之间的协作关系;
  • 正确理解AKKA Actor的消息发送机制,当在Actor内部再次发送消息时,是由sender发送,还是通过消息传递过来的actorRef对象发送消息。

要完成多个网页的字数统计功能,除了使用稍显复杂的Actor模式之外,我们也可以直接使用scala提供的并行集合来完成,代码更为精简:

val words = for {   
 url <- urls.par    
 line <- scala.io.Source.fromURL(url).getLines()    
 word <- line.split(" ")} yield (word)val analysisResult = words.map(w => (w, 1L)).reduceByKey(_ + _)

在业务相对简单,并不需要非阻塞消息处理,也没有可伸缩性需求的时候,若能恰当运用scala自身提供的par集合会是好的选择。

事实上,为了实现字数统计的功能,采用AKKA提供的Aggregator确乎有些过度。它更擅长于通过将职责分治与合理运用基于消息的Actor模式来完成更为复杂的响应式系统。WordCounter的例子不外乎是我为了更好地解释Aggregator模式而给出的一个Demo罢了

原文发布于微信公众号 - 逸言(YiYan_OneWord)

原文发表时间:2016-09-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java架构师学习

分享我在阿里工作十年接触过Java框架设计模式一、前言二、责任链设计模式(Chain of Responsibility Pattern)三、工厂模式(Factory Pattern)四、单例设计模式

一、前言 说起来设计模式,大家应该都耳熟能详,设计模式代表了软件设计的最佳实践,是经过不断总结提炼出来的代码设计经验的分类总结,这些模式或者可以简化代码,或者可...

4518
来自专栏Java成长之路

八、java对象和方法区的垃圾回收

即使在可达性分析算法中不可达的对象,也并非是“非死不可”的,这时候它们暂时处于“缓刑”阶段,要真正宣告一个对象死亡,至少要经历两次标记过程:如果对象在进行可达性...

952
来自专栏微信终端开发团队的专栏

微信ANDROID客户端-会话速度提升70%的背后

image.png 背景 打开会话速度慢 在同一个会话有较多的历史消息下,各种查询,更新,删除等操作,速度明显下降。 在会话内有较大量历史消息情况下,进入...

3687
来自专栏Java帮帮-微信公众号-技术文章全总结

​图;代码轻松理解,代理

代理 代理是英文 Proxy 翻译过来的。我们在生活中见到过的代理,大概最常见的就是朋友圈中卖面膜的同学了。 她们从厂家拿货,然后在朋友圈中宣传,然后卖给熟人。...

2845
来自专栏SeanCheney的专栏

深入理解Python异步编程(上)

彻底理解异步编程是什么、为什么、怎么样。深入学习asyncio的基本原理和原型,了解生成器、协程在Python异步编程中是如何发展的。

961
来自专栏LanceToBigData

OOAD-设计模式(二)之GRASP模式与GOF设计模式概述

一、GRASP模式(通用责任分配软件模式)概述 1.1、理解责任   1)什么是责任     责任是类间的一种合约或义务,也可以理解成一个业务功能,包括行为...

17110
来自专栏Ryan Miao

java并发编程实践学习(2)--对象的组合

先验条件(Precondition):某些方法包含基于状态的先验条件。例如,不能从空队列中移除一个元素,在删除元素前队列必须处于非空状态。基于状态的先验条件的操...

34014
来自专栏大史住在大前端

javascript基础修炼(7)——Promise,异步,可靠性

Promise技术是【javascript异步编程】这个话题中非常重要的,它一度让我感到熟悉又陌生,我熟悉其所有的API并能够在编程中相对熟练地运用,却对其中原...

785
来自专栏水击三千

UML学习-状态图

1.状态图概述 状态图(Statechart Diagram)主要用于描述一个对象在其生存期间的动态行为,表现为一个对象所经历的状态序列,引起状态转移的事件(E...

24210
来自专栏张善友的专栏

WCF服务上应用protobuf

protobuf是google提供的一个开源序列化框架,类似于XML,JSON这样的数据表示语言,其最大的特点是基于二进制,因此比传统的XML表示高效短小得多。...

1966

扫码关注云+社区