首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Akka流图-如何从akka.stream.contrib测试PartitionWith

Akka流图是一种基于Akka框架的流处理模型,用于构建高可伸缩、高并发的数据流处理应用程序。它提供了一种简洁而强大的方式来处理数据流,并且能够自动处理背压(backpressure)问题,确保数据流的稳定和高效。

PartitionWith是Akka流图中的一个操作符,用于根据指定的条件将数据流分割成多个子流。它接收一个函数作为参数,该函数根据数据的特征将数据分配到不同的子流中。这样可以实现对数据流的分区处理,每个子流可以独立进行处理,提高并发性能。

PartitionWith操作符的使用可以通过akka.stream.contrib库来实现。akka.stream.contrib是一个Akka社区提供的扩展库,提供了一些额外的操作符和工具类,用于增强Akka流图的功能。

在使用PartitionWith操作符时,可以根据具体的业务需求来定义分区函数,将数据流按照不同的条件进行分割。例如,可以根据数据的类型、数值范围、关键字等进行分区。分区后的子流可以分别进行不同的处理操作,例如过滤、转换、聚合等。

对于Akka流图的测试,可以使用akka-stream-testkit库来进行单元测试。akka-stream-testkit是Akka提供的测试工具包,用于对Akka流图进行测试和验证。它提供了一些用于构建测试用例的工具类和方法,可以模拟数据流的输入和输出,验证流图的行为和结果。

在测试PartitionWith操作时,可以使用akka-stream-testkit提供的TestSink和TestSource来模拟输入和输出数据流,然后使用PartitionWith操作符对数据流进行分区处理,并使用TestSink来验证分区后的子流的输出结果是否符合预期。

总结起来,Akka流图是一种用于构建高可伸缩、高并发的数据流处理应用程序的模型,PartitionWith是其中的一个操作符,用于将数据流根据指定的条件分割成多个子流。在测试PartitionWith操作时,可以使用akka-stream-testkit库进行单元测试。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Akka(19): Stream:组合数据,组合共用-Graph modular composition

akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据如:Source/Flow/Sink,也可能是由更基础的组合而成相对复杂点的某种复合流,而这个复合流本身又可以被当作组件来组合更大的...下面是akka-stream预设的一些基础数据: ? 上面Source,Sink,Flow代表具备线性步骤linear-stage的,属于最基础的组件,可以用来构建数据处理链条。...然后我们再使用这个自定义模块组建一个完整的闭合流: import akka.actor._ import akka.stream._ import akka.stream.scaladsl._...的GraphDSL,对复合型Graph的构建可以实现形象化,大部分工作都在如何对组件之间的端口进行连接。...但用akka GraphDSL可以很形象的组合这个数据; import GraphDSL.Implicits._ RunnableGraph.fromGraph(GraphDSL.create

1K100

分布式系统中的必备良药 —— RPC

二、成熟的解决方案   1.Google.gRpc(https://github.com/grpc/grpc)     大名鼎鼎的Google出品的RPC框架,基于Http2设计,支持双向、消息头压缩...Thrift的缺点是无法生成async,await,Task之类的泛型代码,这个对于当下大背景来说有一定的局限性(如果有小伙伴知道如何解决此问题,感谢赐教)。...一般用Akka(有.net版本 Akka.net)和它对标,都是基于Actor模型设计的分布式框架,顺手附上一篇经典的对比文章:https://github.com/akka/akka-meta/blob...序列化方式:   序列化一般3个维度去考虑,数据大小、可读性、传输效率(序列化反序列所消耗的时间)。...由于数据比较多,直接付上2个动,想进一步分析的可以在文末下载excel自行解决~。见图2,3: ?                   【2】 ?

70310

ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现

实现说明 出于好奇,下面简单描述下ElasticMQ是如何实现的,包括核心系统,REST层,Akka数据的使用和长轮询的实现。所有的代码都可以在GitHub上找到。...数据,当然这需要启用continuations插件。...使用Akka数据,您可以像正常的顺序代码一样编写使用Future的代码。CPS插件会将其转换为在需要时使用回调。...使用Akka调度程序,我们还计划在指定的超时之后发回空列表并删除条目。 当新消息到达时,我们只需map上获取一个等待请求,然后尝试完成它。同样,所有同步和并发问题都由Akka和参与者模型来处理。...请测试新版本,并告知我们您的任何反馈! 亚当

1.5K90

Akka(17): Stream:数据基础组件-Source,Flow,Sink简介

2、scalaz-sstream和akka-stream的数据都是一种申明式的数据处理流程描述,属于一种运算方案,最终都需要某种运算器来对数据按运算方案进行具体的运算,得出运算结果和产生副作用。...Source可以单值、集合、某种Publisher或另一个数据流产生数据的元素(stream-element),包括: /** * Helper to create [[Source]]...对通过输入端口输入数据的元素进行转变处理(transform)后经过输出端口输出。FlowShape有一个输入端和一个输出端。 在akka-stream里数据组件一般被称为数据(graph)。...我们可以用许多数据组成更大的stream-graph。 akka-stream最简单的完整(或者闭合)线性数据(linear-stream)就是直接把一个Source和一个Sink相接。...意思是选择左边数据的运算结果。我们上面提过akka-stream是在actor系统里处理数据元素的。在这个过程中同时可以用actor内部状态来产生运算结果。

1.6K60

Akka(23): Stream:自定义构件功能-Custom defined stream processing stages

从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据终点Sink三个框架性的构件(stream components)组成的。...一个完整的数据(可运行数据)必须是一个闭合的数据,即:外表上看,数据两头必须连接一个Source和一个Sink。...我们可以直接把一个Sink连接到一个Source来获取一个最简单的可运行数据,如下: Source(1 to 10).runWith(Sink.foreach(println)) 另一个角度说明...:akka-stream又包括数据Graph及运算器Materializer两个部分。...一个可运行数据必须由一个闭合的数据(closed graph)来代表,而这个ClosedGraph又是由代表不同数据转化处理功能的子(sub-graph)组成。

1.7K80

异步编程 - 14 异步、分布式、基于消息驱动的框架 Akka

---- Akka概述 Akka 是一个开源的并发、分布式、基于消息驱动的框架,用于构建高可伸缩性、可靠性和并发性强的应用程序。...插件和扩展:Akka 提供了丰富的插件和扩展机制,可以轻松集成其他库和框架,如 Akka HTTP、Akka Streams 等,以构建全栈应用程序。...反应数据 具有回压的异步非阻塞处理。完全异步和基于的HTTP服务器和客户端为构建微服务提供了一个很好的平台。...异步任务执行失败时,任务状态可能丢失,需要引入新的错误信令机制以及故障中恢复的方法。...【Actor系统】 使用消息传递避免锁和阻塞 Actor之间通信通过消息传递而不是方法调用,不会导致发送消息的调用线程被阻塞。

83040

响应式编程的实践

理解Source的本质 Akka Stream将数据源定义为Source,RxJava则定义为Observable或Flowable。这些响应式编程框架都为Source提供了丰富的operator。...如果我们创建的A与B并不包含uri到user的转换,就可以通过merge等合并操作将A与B合并,然后再共同重用uri到user的转换。...Akka Stream的拓扑 Akka Stream对流处理的抽象被建模为。这一设计思想使得的处理变得更加直观,的处理变成了“搭积木”游戏。...如果这个拓扑过于复杂,我们还可以利用基础Shape组合形成一个个更粗粒度Partial Shap。...Akka Stream之所以将Graph的运行器称之为materializer,大约也是源于这样的隐喻吧。 使用Akka Stream进行响应式处理,我建议参考这样的思维。

1.3K80

ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

:https://dzone.com/articles/elasticmq-070-long-polling-non 译者微博:@流域到海域 译者博客:blog.csdn.net/solo95 ElasticMQ...实现说明 出于好奇,下面是对ElasticMQ如何实现的简短描述,包括核心系统,REST层,Akka数据使用和长轮询实现。所有的代码都可以在GitHub上找到。...如何使用路由中的队列角色(queue actors)来完成HTTP请求? 关于Spray的RequestContext好处是,它所做的只是将一个实例传递给你的路由,不需要任何回复。...使用Akka调度程序,我们还计划在指定的时间超过之后发回空列表并删除条目。 当新消息到达时,我们只需map上等待一个请求,然后尝试去完成它。...同样,所有同步和并发问题都由Akka和actor模型来处理。 请测试新版本,如果您有任何反馈,请让我们知晓! Adam

1.6K60

为什么用 Java:一个 Python 程序员告诉你

用最新的Java来编程 现在你的心情可能已经恶心变成好奇了,那么我们在2015年该如何写Java呢?哪儿开始呢?首先,让我们回顾一些在Java 7和Java 8涌现的核心语言概念。...Guava为如何设计好的的Java API提供了绝佳的案例分析,提供最有效的Java中推荐的最佳实践的具体例子一个很好的案例, Effective Java中推荐的最佳实践大部分都在Guava中得以体现...分布式系统 Akka 提供类似Erlang型的Actor模型的抽象层来编写分布式系统。Akka可以从容应对许多种不同的故障,为编写可靠的分布式系统提供了更高层次的抽象。...Play基于Akka的非阻塞I/O,提供了编写Web应用程序的可扩展的异步框架。如果想使用不那么前沿但是被广泛应用于产品的框架,请尝试Jetty。 单元测试 JUnit 仍为编写单元测试的标准。...模拟(Mocking) Mockito解决了测试Java代码中的很多痛点,但是像Python语言的灵活转换到Java语言的严格,你需要更谨慎地来设计你的类用于模拟。

1.1K90

Akka 指南 之「消息传递可靠性」

如何收到死信?...本地消息发送的可靠性 Akka 测试套件依赖于在本地上下文中不丢失消息(对于非错误条件测试也适用于远程部署),这意味着我们确实尽了最大努力保持测试的稳定性。...对于给定的一对 Actor,直接第一个 Actor 发送到第二个 Actor 的消息将不会被无序接收,这一规则适用于使用基于 TCP 的 Akka 远程传输协议通过网络发送的消息。...如果组件的状态由于机器故障或被推出缓存而丢失,则可以通过重放事件(通常使用快照来加快进程)来重建。Akka Persistence 支持「事件源」。...如何收到死信? Actor 可以订阅事件流上的类akka.actor.DeadLetter,请参阅「事件」了解如何执行该操作。然后,订阅的 Actor 将收到(本地)系统中从那时起发布的所有死信。

1.7K10

更改许可后,Akka 分支 Pekko 进入 Apache 孵化器

事实上,Pekko 是 Akka 项目的一个分支。不久前, Akka 的许可证 Apache 2 更改为 Business Source License 1.1,Pekko 作为新的分支从中拉出。...目前还有一些现有的 Apache 项目,例如 Flink,它们在不同程度上使用了 Akka,因此让 Pekko 成为 Apache 的一部分给了这些其他 Apache 项目的信心。...在此之上,Pekko 提供了一套丰富的构建在 Actors 之上的库来解决现代问题,包括: :遵循响应式标准的完全双向背压 HTTP:建立在之上的全流式 HTTP 客户端 / 服务器,还提供高可用性...Web 服务所需的预期工具(例如连接池) 连接器:一组丰富的连接器集,用于构建在之上的各种数据库、消息传递、持久性服务 grpc:一个 gRPC 服务器 / 客户端 投影(projection):提供...此外,我们需要配置 Apache 构建系统以正确构建一个相当复杂的项目(即 akka 核心有需要多节点机器的测试)。” 点击底部阅读原文访问 InfoQ 官网,获取更多精彩内容!

1.1K20

Akka 指南 之「什么是 Actor?」

Actor 引用 如下面详细介绍的,为了 Actor 模型中获益,需要将 Actor 对象外部屏蔽。...好消息是,概念上讲,Akka 的每个 Actor 都有自己的轻量级线程,这完全与系统的其他部分隔离开来。这意味着,不必使用锁来同步访问,你可以编写 Actor 代码,而不必担心并发性。...邮箱 Actor 的目的是处理消息,这些消息是其他 Actor(或 Actor 系统外部)发送给Actor 的。...由于该策略是如何构建 Actor 系统的基础,因此一旦创建了 Actor,就不能更改它。...我们的测试启发了我们不只是静默地转储消息的原因:我们在发送死信的事件总线上注册TestEventListener,它将记录收到的每个死信的警告,这对于更快地破译测试失败非常有帮助。

88820

为什么用 Java:一个 Python 程序员告诉你

用最新的Java来编程 现在你的心情可能已经恶心变成好奇了,那么我们在2015年该如何写Java呢?哪儿开始呢?首先,让我们回顾一些在Java 7和Java 8涌现的核心语言概念。...Guava为如何设计好的的Java API提供了绝佳的案例分析,提供最有效的Java中推荐的最佳实践的具体例子一个很好的案例, Effective Java中推荐的最佳实践大部分都在Guava中得以体现...分布式系统 Akka 提供类似Erlang型的Actor模型的抽象层来编写分布式系统。Akka可以从容应对许多种不同的故障,为编写可靠的分布式系统提供了更高层次的抽象。...Play基于Akka的非阻塞I/O,提供了编写Web应用程序的可扩展的异步框架。如果想使用不那么前沿但是被广泛应用于产品的框架,请尝试Jetty。 单元测试 JUnit 仍为编写单元测试的标准。...模拟(Mocking) Mockito解决了测试Java代码中的很多痛点,但是像Python语言的灵活转换到Java语言的严格,你需要更谨慎地来设计你的类用于模拟。

77810

比较.NET 平台下 四种流行Actor框架

用户的角度来看,主要的区别是Akka.Net不处理单一的虚拟角色。它而是根据用户指定的分片策略将它们分组为分片,然后将这些分片分配给集群中的机器。...虽然开箱即用的1.4版本使用了Newtonsoft JSON序列化器,但我们的测试表明,使用Hyperion序列化器(目前正在测试)可以获得更好的性能。...优点 有公司支持,有商业支持计划 全面的文档和大量的例子和视频资料 基于著名的Akka框架的概念 能够将集群与本地监督层次结合起来 集群自动负载平衡和 "记忆实体 "机制 缺点 HOCON配置和其他一些...但Dapr的一个额外的部分是虚拟角色模型的实现,其中有一些奥尔良借来的概念。 Dapr以一种与技术无关的方式实现其功能。...展示的应用程序,eShopOnDapr,使用虚拟角色来实现一个持久的工作(流程管理器模式),这是一个有趣的用例。

10710

Succinctly 中文系列教程(三)20220109 更新

Succinctly Akka.NET 教程 零、简介 一、引言 二、Akka.NET 组件 三、演员介绍 四、使用演员 五、演员生命周期和状态 六、演员的可转换行为 七、演员层次结构 八、演员路径和演员选择...九、监督 十、其他组件 十一、Akka.NET 单元测试 十二、Akka.NET 路由 十三、ASP.NET 核心 的演员 十四、Akka.NET 远程处理 十五、最后的话 Succinctly AppInsight...教程 一、引言 二、我们如何开始 三、仪表盘 四、我们周围的指标 五、访问控制 六、警报规则 七、消费和导出数据 八、Visual Studio 和 SDK 九、编写你的代码 十、应用洞察分析 十一...神经网络分类 五、神经二分类 六、神经网络回归 七、LSTM 时间序列回归 八、附录 A:数据集 Succinctly Groovy 教程 零、简介 一、起步 二、语言基础 三、解决方案基础 四、数据...九、测试 HoloLens 体验 十、共享全息 十一、下一步 Succinctly 自定义语言实现教程 一、简介 二、分割和合并算法 三、基本控制语句 四、函数,函数,还是函数 五、异常和自定义函数

18.4K20
领券