ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现

原文作者:Adam Warski

原文地址:https://dzone.com/articles/elasticmq-070-long-polling-non

(地址elasticmq.org已经过期,翻译时将其删除。译者在GitHub上找到了目录:https://github.com/adamw/elasticmq/,同时由于腾讯云+总是识别非法链接,删除文中超链接。)

(译者修改并重新添加了部分超链接。)

一个基于Actor的兼容Scala和Amazon SQS接口的消息队列系统,ElasticMQ 0.7.0,刚刚发布。

这是一次重要的重写,核心部分是使用Akka Actor和REST层则采用Spray。目前为止,只有核心部分和SQS模块被重写;SQL后端和复制(Replication)尚在进行中。

客户端的主要改进是:

  • 近期加入SQS的长轮询(long polling)支持
  • 更简单的独立服务器 - 只需下载一个jar

通过长轮询,您可以在收到消息时指定一个附加MessageWaitTime属性。如果队列中没有消息,而不是正在完成空响应的请求,ElasticMQ将等待MessageWaitTime秒钟,直到消息到达。这有助于减少使用的带宽(不需要非常频繁的请求),提高系统整体性能(发送后立即收到消息)并降低SQS消耗。

现在,独立服务器是一个单一的jar文件。要在本地内存运行一个SQS实现(例如,测试一个使用SQS的应用程序),只需要下载jar文件)并运行:

java -jar elasticmq-server-0.7.0.jar

这将启动一个地址为http://localhost:9324的服务器。当然,接口和端口是可配置的,详情请参阅自述文件。像以前一样,您也可以使用任何基于JVM的语言来运行嵌入式服务器。

实现说明

出于好奇,下面简单描述下ElasticMQ是如何实现的,包括核心系统,REST层,Akka数据流的使用和长轮询的实现。所有的代码都可以在GitHub上找到。

如前所述,ElasticMQ现在使用Akka和Spray实现,并且不包含任何阻塞调用。一切都是异步的。

核心

核心系统是基于Actor的。有一个主Actor(QueueManagerActor),它知道系统中当前创建了哪些队列,并且可以创建和删除队列。

为了与Actor交互,使用了类型化的问答模式(Typed ask pattern)。例如,要查找一个队列(一个队列也是一个Actor),就会定义一个消息:

case class LookupQueue(queueName: String)extends Replyable[Option[ActorRef]]

用法如下所示:

import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor?LookupQueue("Q2")

如前所述,每个队列都是一个Actor,并封装队列状态。我们可以使用简单的可变数据结构,而不需要任何线程同步,因为参与者模型(Actor Model)为我们处理了这个问题。有一些消息可以发送给队列Actor,例如:

case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, 
           waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]
Rest层

SQS查询/ REST层是使用Spray实现的,这是一个基于Akka的轻量级REST/HTTP工具包。

除了基于Actor的非阻塞IO实现外,Spray还提供了强大的路由库spray-routing。它包含一些内置的指令,用于在请求方法(get/post等),提取表单参数的查询或请求路径上的匹配。但它也可以让你使用简单的指令组合来定义你自己的指令。一个典型的ElasticMQ路由如下所示:

val listQueuesDirective = 
  action("ListQueues"){
    rootPath {
      anyParam("QueueNamePrefix"?){prefixOption =>
        //逻辑
      }
    }
  }

上述action与在body参数中的"Action"URL中指定的Action 名字相匹配,并选择接受或拒绝请求,rootPath匹配空路径等等。Spray有一个很好的教程,如果您有兴趣,我鼓励您看看。

如何使用路由中的队列Actor来完成HTTP请求?

关于Spray的好处是,它只是将一个RequestContext实例传递给你的路由,并不期待任何返回。这取决于路由是完全放弃请求还是使用一个值完成。该请求也可以在另一个线程中完成; 或者,例如,在某个未来完成。这恰好是ElasticMQ所采用的。在这里mapflatMapfor-comprehensions(更好的语法是map/ flatMap)非常方便,例如(简化):

//异步的按顺序调用查找队列并删除它,
//因为?返回未来的值
for {
   queueActor <- queueManagerActor ? LookupQueue(queueName)
   _ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
   requestContext.complete(200, "message deleted")
}

有时,如果流程更复杂,ElasticMQ使用Akka 数据流,当然这需要启用continuations插件。还有一个类似的早期的项目,使用宏,Scala async

使用Akka数据流,您可以像正常的顺序代码一样编写使用Future的代码。CPS插件会将其转换为在需要时使用回调。以下是一个来自CreateQueueDirectives的例子:

flow {
  val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
  queueActorOption match {
    case None => {
      val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
      createResult match {
        case Left(e) => throw new SQSException("Queue already created: " + e.message)
        case Right(_) => newQueueData
      }
    }
    case Some(queueActor) => {
      (queueActor ? GetQueueData()).apply()
    }
  }
}

这里的重要部分是flow块,它界定转换范围,以及用于提取未来内容的Future上的apply()调用。这看起来像完全正常的顺序代码,但是在执行时,从第一次使用Future开始将会异步运行。

长轮询

因为所有的代码都是异步和非阻塞的,实现长轮询非常容易。请注意,在从队列接收消息时,我们得到一个Future[List[MessageData]]。为了响应完成这个Future,HTTP请求也被完成并具有适当的响应。然而,这个Future几乎可以立即完成(例如正常情况下),或者在10秒(或者其他时间)之后 ,支持这些所需要的代码没有变化。所以唯一要做的就是延迟完成Future,直到指定的时间过去或新的消息到达。

实现位于于QueueActorWaitForMessagesOps。当接收消息的请求到达,并且队列中没有任何内容时,我们不是立即回复(即向发送者Actor发送空列表),而是将原始请求的引用和发送方actor存储在一个map中。使用Akka调度程序,我们还计划在指定的超时之后发回空列表并删除条目。

当新消息到达时,我们只需从map上获取一个等待请求,然后尝试完成它。同样,所有同步和并发问题都由Akka和参与者模型来处理。

请测试新版本,并告知我们您的任何反馈!

亚当

本文的版权归 大脸猫爱吃鱼 所有,如需转载请联系作者。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏C/C++基础

Google C++编程风格指南(一)之头文件的相关规范

一个良好的编程规范和风格是一名程序猿成熟的标志。规范的编码可以减少代码冗余,降低出错概率,便于代码管理和代码交流等等,事实上,其作用远不止这些,我们要牢记编码规...

17710
来自专栏Python中文社区

Python云计算框架:OpenStack源码分析之RabbitMQ(二)

之前发布的文章因为在编辑后代码部分在手机上看不清已被及时删除,本文重新编辑好之后再发布一次,带来不便请谅解! 專 欄 ❈ ZZR,Python中文社区专栏作者...

32590
来自专栏程序员的知识天地

使用 JS 实现一个本地数据库

前端很多时候还是需要保存一些数据的,这里的保存指的是长久的保存。以前的思想是把数据保存在 Cookie 中,或者将 key 保存在 Cookie 中,将其他数据...

46920
来自专栏Golang语言社区

亲身经历的痛--database/sql: Stmt的使用以及坑

前言 众所周知,golang操作数据库,是通过database/sql包,以及第三方的实现了database/sql/driver接口的数据库驱动包来共同完成的...

1.1K100
来自专栏Python中文社区

OpenStack中的RESTful API是如何实现的?

OpenStack作为一个开源的IaaS平台,各个组件和服务之间的消息传递都是通过RESTfulAPI和RPC传递,这里主要讲讲它是如何实现REST的。由于大家...

35280
来自专栏玄魂工作室

Python爬虫之urllib模块1

Python爬虫之urllib模块1 本文来自网友投稿。作者PG,一个待毕业待就业二流大学生。玄魂工作室未对该文章内容做任何改变。 因为本人一直对推理悬疑比较感...

33860
来自专栏java架构师

BAT美团滴滴java面试大纲(带答案版)之三:多线程synchronized

继续面试大纲系列文章。   从这一篇开始,我们进入ava编程中的一个重要领域---多线程!多线程就像武学中对的吸星大法,理解透了用好了可以得道成仙,俯瞰芸芸众生...

314100
来自专栏Golang语言社区

Go Channel 应用模式(一)

Channel是Go中的一种类型,和goroutine一起为Go提供了并发技术, 它在开发中得到了广泛的应用。Go鼓励人们通过Channel在goroutine...

21420
来自专栏along的开发之旅

Java8移除永久代

最近看深入理解Java虚拟机, 在实战OutOfMemoryError的运行时常量池溢出时, 我的Intellij提示如下:

11810
来自专栏程序员宝库

购物网站的 redis 相关实现(Java)

本文主要内容: 登录cookie 购物车cookie 缓存数据库行 测试 必备知识点: WEB应用就是通过HTTP协议对网页浏览器发出的请求进行相应的服务器或者...

523140

扫码关注云+社区

领取腾讯云代金券