首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Akka Streams:如何按大小对源中的文件列表进行分组?

Akka Streams是一种用于构建可扩展、高吞吐量和容错的流处理应用程序的工具包。它基于Actor模型,提供了一种声明式的方式来处理数据流,并且可以轻松地与其他Akka组件集成。

要按大小对源中的文件列表进行分组,可以使用Akka Streams中的一些操作符和函数来实现。下面是一个示例代码,展示了如何使用Akka Streams按大小对文件列表进行分组:

代码语言:scala
复制
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Sink, Source}
import akka.util.ByteString
import java.nio.file.Paths

object FileGroupingExample extends App {
  implicit val system = ActorSystem("FileGroupingExample")
  implicit val materializer = ActorMaterializer()

  // 源文件列表
  val fileList = List("file1.txt", "file2.txt", "file3.txt", "file4.txt")

  // 按大小分组的阈值(字节数)
  val groupSizeThreshold = 1000000

  // 读取文件并计算文件大小
  val fileSource = Source(fileList)
    .map(file => (file, Paths.get(file)))
    .mapAsync(parallelism = 1) { case (file, path) =>
      FileIO.fromPath(path)
        .runFold(0L)((size, bytes) => size + bytes.length)
        .map(size => (file, size))
    }

  // 根据文件大小进行分组
  val groupBySizeFlow = Flow[(String, Long)]
    .groupBy(fileSize => if (fileSize._2 > groupSizeThreshold) "large" else "small")
    .fold(List.empty[(String, Long)])((acc, fileSize) => fileSize :: acc)
    .mergeSubstreams

  // 打印分组结果
  val printSink = Sink.foreach[List[(String, Long)]](group => println(s"Group: $group"))

  // 运行流处理
  fileSource.via(groupBySizeFlow).runWith(printSink)
}

在上述示例中,我们首先定义了源文件列表和按大小分组的阈值。然后,我们使用Source操作符创建一个文件源,并使用mapAsync操作符异步读取每个文件的内容并计算文件大小。接下来,我们使用groupBy操作符根据文件大小将文件分组为"large"和"small"两个组,并使用fold操作符将文件添加到相应的组中。最后,我们使用mergeSubstreams操作符合并所有分组,并使用Sink操作符打印分组结果。

这是一个简单的示例,演示了如何使用Akka Streams按大小对文件列表进行分组。在实际应用中,您可以根据需要进行更复杂的操作和处理。

腾讯云提供了一系列与流处理相关的产品和服务,例如腾讯云流计算(Tencent Cloud StreamCompute)和腾讯云消息队列(Tencent Cloud Message Queue),您可以根据具体需求选择适合的产品。您可以访问腾讯云官方网站(https://cloud.tencent.com/)了解更多关于这些产品的详细信息和文档。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Linux下如何目录文件进行统计

统计目录文件数量 统计目录中文件最简单方法是使用ls每行列出一个文件,并将输出通过管道符传递给wc计算数量: [root@localhost ~]# ls -1U /etc |wc -l 执行上面的...将显示所有文件总和,包括目录和符号链接。...-1选项表示每行列出一个文件, -U告诉ls不对输出进行排序,这使 执行速度更快。ls -1U命令不计算隐藏文件。...递归统计目录文件 如果想要统计目录文件数量,并包括子目录,可以使用 find命令: [root@localhost ~]# find /etc -type f|wc -l 用来统计文件另一个命令是...总结 在本文中,将展示几种查找Linux目录文件数量不同方法。

2.9K40

alpakka-kafka(1)-producer

alpakka项目是一个基于akka-streams流处理编程工具scala/java开源项目,通过提供connector连接各种数据并在akka-streams进行数据处理。...alpakka-kafka提供了kafka核心功能:producer、consumer,分别负责把akka-streams数据写入kafka及从kafka读出数据并输入到akka-streams...用akka-streams集成kafka应用场景通常出现在业务集成方面:在一项业务A中产生一些业务操作指令写入kafka,然后通过kafka把指令传送给另一项业务B,业务B从kafka获取操作指令并进行相应业务操作...在alpakka,实际业务操作基本就是在akka-streams数据处理(transform),其实是典型CQRS模式:读写两方互不关联,写时不管受众是谁,如何使用、读者不关心谁是写方。...alpakka提供producer也就是akka-streams一种组件,可以与其它akka-streams组件组合形成更大akka-streams个体。

93320

FunDA(0)- Functional Data Access accessible to all

一些不算FP编程专家的人来说,如何用他们习惯方式来使用现成函数式软件如Slick,Spark等可能就变得是件很迫切事情了。...FunDA包括两项重大功能: 一、提供行处理数据功能支持:FRM最强大功能之一就是能够实现Query函数组合,然后产生SQL语句来后台数据库进行操作,返回结果是一个集合。...二是每条数据行要求进行状态处理函数运算run(func)或者并行运算runPar(func)。产生数据同样可以实现并行运算,比如通过构建一个多任务计算对象后进行运算。...(updateRow) //源头产生数据行进行并行处理 数据流动管理和运算管理功能可以通过某种流库(stream library)如scalar-streams-fs2...三、freemonad stream+FRM DSL:用freemonad来抽象FunDA全部操作,全面实现与下层软件工具库松散耦合,同时提供scalaz-streams-fs2、akka-stream

1K100

Java8Streams分组操作讲解

Streams 流随着 JDK 1.8 发布而出现,是集合(Collection)对象功能增强,它专注于集合对象进行各种聚合或者分组操作。...本文我会给大家详细讲解下 Streams 流相关分组操作。 假设我们有一组学生,需要按年龄他们进行分组。按照 Java 得传统方式,我们可能需要好几个步骤。...我们需要按年龄这些员工对象进行分组如何实现这一目标?...自定义对象进行分组 举例 一 假设我们有一个项目列表。我们 pojo 是具有名称、价格和数量 Item。...Collectors.mapping(Item::getName, Collectors.toSet()) — 将分组后得商品列表转化为名称列表 如果我们需要对分组商品名称价格进行排序?

31710

异步编程 - 14 异步、分布式、基于消息驱动框架 Akka

插件和扩展:Akka 提供了丰富插件和扩展机制,可以轻松集成其他库和框架,如 Akka HTTP、Akka Streams 等,以构建全栈应用程序。...下面看下Akka特性: 可以更简单地构建并发和分布式系统 Akka基于Actor模型和Streams,让我们可以构建可伸缩,并且可以高效使用服务器资源,使用多个服务器进行扩展系统。...---- 传统编程模型存在问题 封装特性挑战 面向对象编程封装要求数据只能通过对象提供方法间接访问,但多线程下多个线程同时修改对象内部数据会导致线程安全问题。...共享内存在现代计算机架构上误解 在多核CPU架构,多线程之间不再有真正共享内存,而是通过Cache行传递数据,使得共享变量内存可见性成为问题。...Actor模型采用树状层次结构监督机制,父Actor可以对子Actor故障进行监控和处理。 监督程序可以决定是否重新启动子Actor或停止子Actor,确保系统可恢复性和健壮性。

73040

Play For Scala 开发指南 - 第1章 Scala 语言简介

Martin还曾受雇于 Sun 公司,编写了 javac 参考编译器,这套系统后来演化成了 JDK 自带 Java 编译器。...这主要得益于Scala强大类型推断系统,在编译期可以进行静态类型推断,不仅可以降低代码冗余性,而且也不用担心动态类型语言重构灾难。...2.12版本 2017年发布2.13-M2版本 Scala全面拥抱现有的Java生态系统,可以和现有Java类库实现无缝连接,你可以在Scala项目直接引入现有的Java依赖,或是直接引入Java源码文件...Akka包含很多模块,Akka Actor是Akka核心模块,使用Actor模型实现并发和分布式,可以将你从Java多线程痛苦解救出来;Akka Streams可以让你以异步非阻塞方式处理流数据...;Distributed Data可以帮助你在集群之间分享数据;Alpakka可以帮你为Akka Streams集成不同数据Akka Persistence可以帮你处理Actor消息持久化存储,

1.3K60

反应式架构(1):基本概念介绍 顶

淘宝从2018年开始整体架构进行反应式升级, 取得了非常好成绩。...系统应该用户请求即时做出响应。即时响应是可用性和实用性基石, 而更加重要是,即时响应意味着可以快速地检测到问题并且有效地进行处理。 回弹性(Resilient)。...有一点需要提醒是,虽然Java 9已经实现了Reactive Streams,但这并不意味着像RxJava、Reactor、Akka Streams这些流处理框架就没有意义了,事实上恰恰相反。...别急, 在下一篇文章,我们将会看到如何利用反应式编程简化异步调用问题。 3 总结        本文通过两部分内容为大家介绍了反应式基本概念。..., Scala, Kafka and Akka Streams

1.6K10

akka-streams - 从应用角度学习:basic stream parts

实际上很早就写了一系列关于akka-streams博客。但那个时候纯粹是为了了解akka而去学习,主要是从了解akka-streams原理为出发点。...因为akka-streamsakka系列工具基础,如:akka-http, persistence-query等都是基于akka-streams,其实没有真正把akka-streams用起来。...这段时间所遇到一些需求也是通过集合来解决。不过,现在所处环境还是逼迫着去真正了解akka-streams应用场景。...所以流处理应该是分布式数据处理理想方式了。这是这次写akka-streams初衷:希望能通过akka-streams来实现分布式数据处理编程。...由于运算值是无法当作流元素传递,Flow只能是用来Source传下来元素进行转换后再传递给Sink,也就是说Flow是由一个或多个处理环节构成

1K10

Akka 指南 之「消息传递可靠性」

高级抽象 消息模式 事件 带明确确认邮箱 死信 应该用死信做什么? 如何收到死信?...在远程消息发送情况下,涉及到更多步骤,这意味着更多步骤可能出错。另一个方面是本地发送将在同一个 JVM 传递对消息引用,而对发送底层对象没有任何限制,而远程传输将限制消息大小。...下文将进一步讨论这种权衡(trade-off)细节。 作为补充部分,我们如何在内置基础上构建更强可靠性给出了一些建议。...事件 事件(和分片)是大型网站扩展到数十亿用户原因,其思想非常简单:当一个组件(思考 Actor)处理一个命令时,它将生成一个表示命令效果事件列表。除了应用于组件状态之外,还存储这些事件。...Actor 可以订阅事件流上akka.actor.DeadLetter,请参阅「事件流」了解如何执行该操作。然后,订阅 Actor 将收到(本地)系统从那时起发布所有死信。

1.7K10

akka-grpc - 基于akka-http和akka-streamsscala gRPC开发工具

在http/1应用二进制文件传输交换有诸多限制和不便,特别是效率方面的问题。在protobuf这种序列化模式任何类型数据格式都一视同仁,可以很方便实现图片等文件上传下载。...至于akka-grpc基于akka-streams特性,我并没有感到太大兴趣。如上所述,我们目标是实现一种开放数据平台终端接入接口。...akka-streams恰恰把总体方案限制在了内部系统集成模式,因为服务端客户端就是akka-streams两端,是内部系统集成场景。...也许,在开发一套内部IT系统过程akka-grpc可以很趁手。...所以,akka-grpc并没有提供OAuth2规范身份验证支持。在这个例子里我们就只能进行基本身份证明(如店号、机器号等),但身份验证过程安全性就不做任何加密操作了。

1.9K20

Kafka Streams - 抑制

◆聚合概念 Kafka Streams Aggregation概念与其他函数式编程(如Scala/Java Spark Streaming、Akka Streams)相当相似。...我们1天Tumbling时间窗口感兴趣。 注意:所有的聚合操作都会忽略空键记录,这是显而易见,因为这些函数集目标就是特定键记录进行操作。...根据上述文件定义,我们希望每天在宽限期过后产生一个汇总统计信息(与UTC一致)。但是,有一个注意点。在遇到相同group-by key之前,suppress不会刷新聚合记录!!。...为了在所有事件中使用相同group-by key,我不得不在创建统计信息时在转换步骤key进行硬编码,如 "KeyValue.pair("store-key", statistic)"。...然后,groupByKey()将正确地将所有的统计信息分组。 在CDC架构,我们不能期望在宽限期后就有DB操作发生。在非高峰期/周末,可能没有数据库操作。但我们仍然需要生成聚合消息。

1.5K10

PlayScala 开发技巧 - 实时同步 MongoDB 高可用方案

MongoDB 从 3.6 开始为开发者提供了 Change Streams 功能,利用 Change Streams 功能可以非常方便地监听指定 Collection 上数据变化。...利用 Play Mongo 可以方便地实现数据监听功能,并且我们可以将 Change Stream 转换成 Akka Stream,然后以流方式处理指定 Collection 上数据变化, mongo...,以方便批处理,当满足下面任意一个条件时便结束缓冲向后传递: 缓冲满10个元素 缓冲时间超过了1000毫秒 缓冲后元素进行流控,每秒只允许通过1个元素 3 如何实现高可用?...上面的代码并没有考虑可用性,如果在监听过程中发生了网络错误,如何从错误恢复呢?...文档中提及程序可以自动从可恢复错误恢复。

64130

kakafka - 为CQRS而生

我想作为一种消息驱动系统,如何保证akka消息正确产生和安全使用应该是最基本要求。而恰恰akka是没有提供对消息遗漏和重复消息保障机制。我想这也是造成akka用户担心主要原因。...不过akka在alpakka社区提供了alpakka-kafka:这个东西是个基于akka-streamskafka scala终端编程工具,稍微过了一下,感觉功能比较全面,那就是它了。...要注意是创建topic和partition都是严格管理工作admin,不是在某些程序任意进行增减。一般来讲,在创建一个新topic时就要确定它下面的partition数量了。...至于goup内reader是如何分配partition完全由kafka内部解决。如果发现新partition或者组内reader有增减变化,kafka会自动进行再分配rebalance。...kafka最重要特点就是可以容许不同应用通过不同reader-group同一个partition上事件进行任意读取,本意应该是不同应用可以利用同一个业务事件序列进行不同业务处理。

57220

Akka-CQRS(6)- read-side

前面我们全面介绍了在akka-cluster环境下实现CQRS写端write-side。简单来说就是把发生事件描述作为对象严格发生时间顺序写入数据库。...也就是说在另一个线程里有个程序也按时间顺序把这些二进制格式对象读出来、恢复成某种结构如ActionGo类型、然后按照结构内操作指令业务数据进行实际操作处理,这时才会产生业务数据影响。...,但同时也存在订阅方sub即reader十分难以控制问题,而且可以肯定是订阅到达消息无法保证是发出时间顺序接收,我们无法控制akka传递消息过程。...而具体pull时段间隔如何设定也是一个比较棘手问题。无论如何akka提供了Persistence-Query作为一种CQRS读端工具。...refresh-interval可以在配置文件设置,如下面的cassandra-plugin配置: cassandra-query-journal { # Implementation class

60430

Flink 最锋利武器:Flink SQL 入门和实战

虽然 Avro 类型是 Flink 1.7 唯一支持模式演变内置类型,但社区仍在继续致力于在未来 Flink 版本中进一步扩展其他类型支持。...一个完整 Flink SQL 编写程序包括如下三部分: Source Operator:Soruce operator 是对外部数据抽象, 目前 Apache Flink 内置了很多常用数据实现例如...Flink SQL 算子支持,接下来我们 Flink SQL 中最常见算子语义进行介绍。...GROUP BY GROUP BY 是对数据进行分组操作。例如我们需要计算成绩明细表,每个学生总分。...举个例子,假如我们要计算每个人每天订单量,按照 user 进行聚合分组: SELECT user, TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart,

16.8K41
领券