前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现

ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现

作者头像
大脸猫爱吃鱼
修改2018-01-15 20:40:14
1.5K0
修改2018-01-15 20:40:14
举报

原文作者:Adam Warski

原文地址:https://dzone.com/articles/elasticmq-070-long-polling-non

(地址elasticmq.org已经过期,翻译时将其删除。译者在GitHub上找到了目录:https://github.com/adamw/elasticmq/,同时由于腾讯云+总是识别非法链接,删除文中超链接。)

(译者修改并重新添加了部分超链接。)

一个基于Actor的兼容Scala和Amazon SQS接口的消息队列系统,ElasticMQ 0.7.0,刚刚发布。

这是一次重要的重写,核心部分是使用Akka Actor和REST层则采用Spray。目前为止,只有核心部分和SQS模块被重写;SQL后端和复制(Replication)尚在进行中。

客户端的主要改进是:

  • 近期加入SQS的长轮询(long polling)支持
  • 更简单的独立服务器 - 只需下载一个jar

通过长轮询,您可以在收到消息时指定一个附加MessageWaitTime属性。如果队列中没有消息,而不是正在完成空响应的请求,ElasticMQ将等待MessageWaitTime秒钟,直到消息到达。这有助于减少使用的带宽(不需要非常频繁的请求),提高系统整体性能(发送后立即收到消息)并降低SQS消耗。

现在,独立服务器是一个单一的jar文件。要在本地内存运行一个SQS实现(例如,测试一个使用SQS的应用程序),只需要下载jar文件)并运行:

代码语言:javascript
复制
java -jar elasticmq-server-0.7.0.jar

这将启动一个地址为http://localhost:9324的服务器。当然,接口和端口是可配置的,详情请参阅自述文件。像以前一样,您也可以使用任何基于JVM的语言来运行嵌入式服务器。

实现说明

出于好奇,下面简单描述下ElasticMQ是如何实现的,包括核心系统,REST层,Akka数据流的使用和长轮询的实现。所有的代码都可以在GitHub上找到。

如前所述,ElasticMQ现在使用Akka和Spray实现,并且不包含任何阻塞调用。一切都是异步的。

核心

核心系统是基于Actor的。有一个主Actor(QueueManagerActor),它知道系统中当前创建了哪些队列,并且可以创建和删除队列。

为了与Actor交互,使用了类型化的问答模式(Typed ask pattern)。例如,要查找一个队列(一个队列也是一个Actor),就会定义一个消息:

代码语言:javascript
复制
case class LookupQueue(queueName: String)extends Replyable[Option[ActorRef]]

用法如下所示:

代码语言:javascript
复制
import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor?LookupQueue("Q2")

如前所述,每个队列都是一个Actor,并封装队列状态。我们可以使用简单的可变数据结构,而不需要任何线程同步,因为参与者模型(Actor Model)为我们处理了这个问题。有一些消息可以发送给队列Actor,例如:

代码语言:javascript
复制
case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, 
           waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]
Rest层

SQS查询/ REST层是使用Spray实现的,这是一个基于Akka的轻量级REST/HTTP工具包。

除了基于Actor的非阻塞IO实现外,Spray还提供了强大的路由库spray-routing。它包含一些内置的指令,用于在请求方法(get/post等),提取表单参数的查询或请求路径上的匹配。但它也可以让你使用简单的指令组合来定义你自己的指令。一个典型的ElasticMQ路由如下所示:

代码语言:javascript
复制
val listQueuesDirective = 
  action("ListQueues"){
    rootPath {
      anyParam("QueueNamePrefix"?){prefixOption =>
        //逻辑
      }
    }
  }

上述action与在body参数中的"Action"URL中指定的Action 名字相匹配,并选择接受或拒绝请求,rootPath匹配空路径等等。Spray有一个很好的教程,如果您有兴趣,我鼓励您看看。

如何使用路由中的队列Actor来完成HTTP请求?

关于Spray的好处是,它只是将一个RequestContext实例传递给你的路由,并不期待任何返回。这取决于路由是完全放弃请求还是使用一个值完成。该请求也可以在另一个线程中完成; 或者,例如,在某个未来完成。这恰好是ElasticMQ所采用的。在这里mapflatMapfor-comprehensions(更好的语法是map/ flatMap)非常方便,例如(简化):

代码语言:javascript
复制
//异步的按顺序调用查找队列并删除它,
//因为?返回未来的值
for {
   queueActor <- queueManagerActor ? LookupQueue(queueName)
   _ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
   requestContext.complete(200, "message deleted")
}

有时,如果流程更复杂,ElasticMQ使用Akka 数据流,当然这需要启用continuations插件。还有一个类似的早期的项目,使用宏,Scala async

使用Akka数据流,您可以像正常的顺序代码一样编写使用Future的代码。CPS插件会将其转换为在需要时使用回调。以下是一个来自CreateQueueDirectives的例子:

代码语言:javascript
复制
flow {
  val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
  queueActorOption match {
    case None => {
      val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
      createResult match {
        case Left(e) => throw new SQSException("Queue already created: " + e.message)
        case Right(_) => newQueueData
      }
    }
    case Some(queueActor) => {
      (queueActor ? GetQueueData()).apply()
    }
  }
}

这里的重要部分是flow块,它界定转换范围,以及用于提取未来内容的Future上的apply()调用。这看起来像完全正常的顺序代码,但是在执行时,从第一次使用Future开始将会异步运行。

长轮询

因为所有的代码都是异步和非阻塞的,实现长轮询非常容易。请注意,在从队列接收消息时,我们得到一个Future[List[MessageData]]。为了响应完成这个Future,HTTP请求也被完成并具有适当的响应。然而,这个Future几乎可以立即完成(例如正常情况下),或者在10秒(或者其他时间)之后 ,支持这些所需要的代码没有变化。所以唯一要做的就是延迟完成Future,直到指定的时间过去或新的消息到达。

实现位于于QueueActorWaitForMessagesOps。当接收消息的请求到达,并且队列中没有任何内容时,我们不是立即回复(即向发送者Actor发送空列表),而是将原始请求的引用和发送方actor存储在一个map中。使用Akka调度程序,我们还计划在指定的超时之后发回空列表并删除条目。

当新消息到达时,我们只需从map上获取一个等待请求,然后尝试完成它。同样,所有同步和并发问题都由Akka和参与者模型来处理。

请测试新版本,并告知我们您的任何反馈!

亚当

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实现说明
    • 核心
      • Rest层
        • 长轮询
        相关产品与服务
        消息队列 CMQ
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档