首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现

这是一次重要的重写,核心部分是使用Akka Actor和REST层则采用Spray。目前为止,只有核心部分和SQS模块被重写;SQL后端和复制(Replication)尚在进行中。...要在本地内存运行一个SQS实现(例如,测试一个使用SQS的应用程序),只需要下载jar文件)并运行: java -jar elasticmq-server-0.7.0.jar 这将启动一个地址为http...实现说明 出于好奇,下面简单描述下ElasticMQ是如何实现的,包括核心系统,REST层,Akka数据的使用和长轮询的实现。所有的代码都可以在GitHub上找到。...数据,当然这需要启用continuations插件。...使用Akka数据,您可以像正常的顺序代码一样编写使用Future的代码。CPS插件会将其转换为在需要时使用回调。

1.5K90
您找到你想要的搜索结果了吗?
是的
没有找到

ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

这是一次重大的重写(即版本升级),升级之后将在核心使用Akka actors 并在REST层使用Spray。...到目前为止,只有核心和SQS模块被重写, 日志( journaling),SQL后端和副本(replication)模块的重写尚未完成。...这有助于减少带宽的使用(不需要非常频繁地进行请求),进而提高系统整体性能(发送后立即收到消息)并降低SQS成本。 独立的服务器现在是一个单一的jar包。...要运行本地内存SQS实现(例如,测试使用SQS的应用程序),只需要下载jar文件并运行: java -jar elasticmq-server-0.7.0.jar 这将在http://localhost...实现说明 出于好奇,下面是对ElasticMQ如何实现的简短描述,包括核心系统,REST层,Akka数据使用和长轮询实现。所有的代码都可以在GitHub上找到。

1.6K60

Akka(17): Stream:数据基础组件-Source,Flow,Sink简介

2、scalaz-sstream和akka-stream的数据都是一种申明式的数据处理流程描述,属于一种运算方案,最终都需要某种运算器来对数据按运算方案进行具体的运算,得出运算结果和产生副作用。...akka-stream的数据是由三类基础组件组合而成,不同的组合方式代表不同的数据处理及表达功能。三类组件分别是: 1、Source:数据源。...对通过输入端口输入数据的元素进行转变处理(transform)后经过输出端口输出。FlowShape有一个输入端和一个输出端。 在akka-stream里数据组件一般被称为数据图(graph)。...我们可以用许多数据图组成更大的stream-graph。 akka-stream最简单的完整(或者闭合)线性数据(linear-stream)就是直接把一个Source和一个Sink相接。...意思是选择左边数据图的运算结果。我们上面提过akka-stream是在actor系统里处理数据元素的。在这个过程中同时可以用actor内部状态来产生运算结果。

1.6K60

Akka(23): Stream:自定义构件功能-Custom defined stream processing stages

从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据终点Sink三个框架性的构件(stream components)组成的。...一个完整的数据(可运行数据)必须是一个闭合的数据,即:从外表上看,数据两头必须连接一个Source和一个Sink。...:akka-stream又包括数据图Graph及运算器Materializer两个部分。...akka-stream在数据的各环节都实现了Reactive-Stream-Specification,所以对于输入端口InHandler来讲需要响应上游推送信号onPush,输出端口OutHandler...对于一对多扩散型和多对一合并型形状的数据构件akka-stream提供了UniformFanIn和UniformFanOut两种GraphStage。

1.7K80

Akka(19): Stream:组合数据,组合共用-Graph modular composition

akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据图如:Source/Flow/Sink,也可能是由更基础的图组合而成相对复杂点的某种复合流图,而这个复合流图本身又可以被当作组件来组合更大的...下面是akka-stream预设的一些基础数据图: ? 上面Source,Sink,Flow代表具备线性步骤linear-stage的图,属于最基础的组件,可以用来构建数据处理链条。...然后我们再使用这个自定义图模块组建一个完整的闭合流图: import akka.actor._ import akka.stream._ import akka.stream.scaladsl._...但用akka GraphDSL可以很形象的组合这个数据图; import GraphDSL.Implicits._ RunnableGraph.fromGraph(GraphDSL.create...的运算是在actor上进行的,除了大家都能对数据元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。

1K100

急需降低系统复杂性,我们从 Kafka 迁移到了 Pulsar

这有助于替换 Iterable 采用的 RabbitMQ 消息系统,并最终替换其他消息系统(如 Kafka 和 Amazon SQS)。...RabbitMQ 和 Amazon SQS 都是基于队列的消息系统。 通常情况下,消息队列系统可以简化消息级别错误的处理。...在评估了几个消息系统后,我们决定使用 Pulsar,因为 Pulsar 的可扩展性、可靠性和特性之间达到了完美的平衡,足以取代 Kafka、Amazon SQS 等消息系统。...我们还贡献了一个基于 Akka Streams 的连接器,作为 source 接收消息,还支持 ack。 例如,我们可以这样消费命名空间中的所有 topic。....*".r, subscription = Subscription("email-service"))) // Create an Akka streams Source stage for this

87810

服务编排--Conductor 文档翻译 (介绍与基本概念)

在执行时,它实例化子工作并等待它完成 EVENT 在支持的事件系统中生成事件(例如,Conductor,SQS) Conductor提供了一个API来创建在与引擎相同的JVM中执行的用户定义任务。...SQS队列 可以使用以下API检索服务器用于更新任务状态的SQS队列: GET /queue 更新任务状态时,消息需要符合以下规范: 消息必须是有效的JSON字符串。...Event (事件) 事件任务提供将事件(消息)发布到Conductor或外部事件系统(如SQS)的功能。事件任务对于为工作和任务创建基于事件的依赖项非常有用。...例如,导体或sqssqs_queue_name 例 { "sink": 'sqs:example_sqs_queue_name' } 使用Conductor作为接收器生成事件时,事件名称遵循以下结构...支持的接收器 Conductor SQS 事件任务输入 给予事件任务的输入可作为有效负载用于已发布的消息。例如,如果消息被放入SQS队列(接收器是sqs),则消息有效负载将是任务的输入。

4.8K40

Serverless 常见的应用设计模式

在状态机中可以处理嵌套的工作逻辑、错误和重试。不同版本的工作,可以很方便对生产系统进行升级或回滚,此外还可以减少自定义代码,使应用程序更易于测试和维护。...实施方面,可以使用 SQS 构建此模式。 消息队列包含多个发送方/接收方的时候,而每个 SQS 队列通常只有一个接收器。...SQS 队列可以订阅一个 SNS 主题,将消息推送到 SNS 主题,SQS 会自动将消息推送到所有订阅的队列。...这也就意味着,对于不同优先级的消息拥有完全不同的工作。优先级高的消息,会通过使用更昂贵的服务和容量更大的 API 来加快工作,而不需要尽快处理的消息则使用不同的工作。...SNS 主题支持其他订阅者,例如电子邮件和 SQS 队列。向主题添加新消息可以同时调用 Lambda 函数、发送电子邮件或将消息推送到 SQS 队列。

2.7K30

akka-streams - 从应用角度学习:basic stream parts

实际上很早就写了一系列关于akka-streams的博客。但那个时候纯粹是为了了解akka而去学习的,主要是从了解akka-streams的原理为出发点。...因为akka-streams是akka系列工具的基础,如:akka-http, persistence-query等都是基于akka-streams的,其实没有真正把akka-streams用起来。...所以处理应该是分布式数据处理的理想方式了。这是这次写akka-streams的初衷:希望能通过akka-streams来实现分布式数据处理编程。...虽然运算值不能像元素一样流动,但akka-streams提供了机制让用户选择是否返回某个节点的运算值M。...akka-streams提供了简便一点的运算方式runWith:指定runWith参数组件的M为最终运算值。

1K10

基础设施即代码的历史与未来

例如,在函数执行上下文中成功触发给定队列的情况下,需要授予 IAM 角色一组非常特定的权限(sqs:ReceiveMessage、sqs:DeleteMessage、sqs:GetQueueAttributes...Eventual 第一个是 Eventual ,它是一个 TypeScript 库,定义了现代云应用程序的几个通用构建块:Services(服务)、APIs(API接口)、Workflows(工作)、...以下是一个简单的示例,展示了一个Event(事件)、Subscription(订阅)、Task(任务)、Workflow(工作)和API(应用程序接口): import { event, subscription...helloTask = task("helloTask", async (name: string) => { return `hello ${name}`; }); // 使用上述任务的示例工作...hello 创建一个REST API export const hello = command("hello", async (name: string) => { // 触发上述工作

10810

异步编程 - 14 异步、分布式、基于消息驱动的框架 Akka

---- Akka概述 Akka 是一个开源的并发、分布式、基于消息驱动的框架,用于构建高可伸缩性、可靠性和并发性强的应用程序。...以下是 Akka 框架的关键概念和特点: Actor 模型:Akka 的核心构建块是 Actor,它是一种轻量级并发原语。...Akka 提供了透明的消息传递,使得在分布式环境中发送消息就像在本地一样简单。 容错性:Akka 强调容错性,允许开发人员构建可靠的系统。...插件和扩展:Akka 提供了丰富的插件和扩展机制,可以轻松集成其他库和框架,如 Akka HTTP、Akka Streams 等,以构建全栈应用程序。...反应数据 具有回压的异步非阻塞处理。完全异步和基于的HTTP服务器和客户端为构建微服务提供了一个很好的平台。

74240

Agari使用Airbnb的Airflow实现更智能计划任务的实践

工作调度程序是一个负责让工作流在可靠并可扩展方法中周期性执行的系统。...工作调度程序是无处不在的,例如,任何有数据仓库的公司都有一个通常用于报告的专门的数据库,该数据库使用工作调度程序夜以继日地加载到数据库。...比如像Agari这样的公司更感兴趣的是可以使用工作调度程序更可靠地执行复杂而关键的”大”数据科学工作!...当第二个Spark把他的输出写到S3,S3“对象已创建”,通知就会被发送到一个SQS队列中。...在我们的例子中,如果我们检查并发现SQS中没有数据,我们会放弃继续进行并且发送一封通知SQS中数据丢失的通知邮件!如果一切正常,那么消息将在SQS中显示,我们将继续进行我们管道中的主要工作!

2.6K90

Akka(20): Stream:异步运算,压力缓冲-Async, batching backpressure and buffering

akka-stream原则上是一种推式(push-model)的数据。...对于akka-stream这种push模式的数据,因为超速推送数据会造成数据丢失,所以必须想办法控制publisher产生数据的速度。...因为akka-stream已经在上下游环节全部实现了Reactive-Streams-Specification,所以上下游之间可以进行互动,这样就可以在akka-stream里由下游通知上游自身可接收数据的状态来控制上游数据流速...另外,如果用async进行数据的并行运算的话上游就不必理会下游反应,可以把数据推进buffer然后立即继续处理下一个数据元素。所以async运算模式的buffering就不可或缺了。...akka-stream可以通过以下几种方式来设定异步运算使用的缓冲大小: 1、在配置文件中设定默认buffer: akka.stream.materializer.max-input-buffer-size

84870
领券