首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

编程修炼 | Scala亮瞎Java的眼(二)

通常而言,OOFP会显得相对困难,这是两种根本不同的思维范式。张无忌学太极剑,学会的是忘记,只取其神,我们学FP,还得尝试忘记OO。自然,学到后来,其实还是万法归一。...-> 12, java -> 4, python -> 10) 之后,Map转换为Seq,然后按照统计的数值降序排列,接着反转顺序即可。...由于Scala在2.10版本中将原有的Actor取消,转而使用AKKA,所以我在演讲中并没有提及Actor。这是另外一个大的话题。...我们还可以利用for表达式组合多个futureAKKA中的ask模式也经常采用这种方式: object Cloud { def runAlgorithm(times: Int): Future...JVM的编译与纯粹的静态编译不同,Java和Scala编译器都是源代码转换为JVM字节码,而在运行时,JVM会根据当前运行机器的硬件架构,JVM字节码转换为机器码。

1.4K50

ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

还有一个类似的项目,使用宏,Scala Async,但这个仍处于早期开发阶段。 使用Akka Dataflow,您可以编写使用Future们的代码,就好像编写正常的序列化代码一样。...CPS插件会将其转换为在需要使用回调。...请注意,从一个队列接收消息,我们得到一个Future[List[MessageData]]。为了发出响应已完成这个future,HTTP请求也将会以适当的响应来完成。...当接收到消息的请求到达,队列中没有任何内容产生,而是立即回复(即向发送者actor发送空列表),我们储存原始请求的引用和发送方actor在map中。...使用Akka调度程序,我们还计划在指定的时间超过之后发回空列表并删除条目。 当新消息到达,我们只需从map上等待一个请求,然后尝试去完成它。

1.5K60
您找到你想要的搜索结果了吗?
是的
没有找到

ElasticMQ 0.7.0:使用Akka和Spray的长轮询,非阻塞实现

如何使用路由中的队列Actor来完成HTTP请求? 关于Spray的好处是,它只是一个RequestContext实例传递给你的路由,并不期待任何返回。这取决于路由是完全放弃请求还是使用一个值完成。...还有一个类似的早期的项目,使用宏,Scala async。 使用Akka数据流,您可以像正常的顺序代码一样编写使用Future的代码。CPS插件会将其转换为在需要使用回调。...请注意,在从队列接收消息,我们得到一个Future[List[MessageData]]。为了响应完成这个Future,HTTP请求也被完成并具有适当的响应。...当接收消息的请求到达,并且队列中没有任何内容,我们不是立即回复(即向发送者Actor发送空列表),而是原始请求的引用和发送方actor存储在一个map中。...使用Akka调度程序,我们还计划在指定的超时之后发回空列表并删除条目。 当新消息到达,我们只需从map上获取一个等待请求,然后尝试完成它。同样,所有同步和并发问题都由Akka和参与者模型来处理。

1.5K90

Windows环境下Flink消费Kafka实现热词统计

默认的Flink的Slots配置是1,当出现任务插槽不够用时,上图圈圈一会就会失败,然后打开job manager 点击log就可以看到job因为没有可用的任务插槽而失败了。...(Future.scala:258) at akka.dispatch.OnComplete.internal(Future.scala:256) at akka.dispatch.japi$CallbackBridge.apply...(Future.scala:186) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) at scala.concurrent.impl.CallbackRunnable.run...如果此值大于1,则单个TaskManager获取函数或运算符的多个实例。这样,TaskManager可以使用多个CPU内核,但同时,可用内存在不同的操作员或功能实例之间划分。...后面生产环境也打算使用kafka来传递从mysql binlog中心解析到的消息,算是一个生产实例的敲门砖吧。

21440

Scala Actors迁移指南

早在Scala2.10.0的时候,默认的actor库即是Akka。 为了方便的Scala Actors迁移到Akka,我们提供了Actor迁移工具包(AMK)。...这降低了在同一刻引入多个bug的可能性,同样降低了bug的复杂程度。 在Scala方面迁移完成后,用户应该改变import语句并变成使用Akka库。...在Akka中没有相应的功能。 所有其他Actor方法需要转换为两个ActorRef中的方法。转换是通过下面描述的规则。...替换都将显式的阻塞在future对象 这里没有提到的公共方法是为了actors DSL被申明为公共的。他们只能在定义actor使用,所以他们的这一步迁移是不相关的。...远程控制器在ActWithStash 下无法直接使用,register(‘name, this)方法需要被替换为: registerActorRef('name, self) 在后面的步骤中, registerActorRef

97320

restapi(2)- generic restful CRUD:通用的restful风格数据库表维护工具

前面谈过身份验证和使用权限、文件的上传下载,这次来到具体的数据库表维护。我们在这篇示范里设计一套通用的对平台每一个数据表的标准维护方式。...akka-http提供了丰富的Marshaller来实现自动的数据转换,但在编译要提供Marshaller的隐式实例implicit instance,所以用类参数是无法通过编译的。...: String)(implicit m: Manifest[E]): E = { Serialization.read[E](json) } } 当然对于一些特别的数据库表,我们还是希望使用...AddressRepo.deleteById(id)) { addr => complete(s"address deleted: $addr") } } } 这样做可以灵活的使用..." %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream" % "2.5.23", "com.pauldijou" %

71120

Akka(42): Http:身份验证 - authentication, authorization and use of raw headers

当我们把Akka-http作为数据库数据交换工具,数据是以Source[ROW,_]形式存放在Entity里的。很多时候除数据之外我们可能需要进行一些附加的信息传递如对数据的具体处理方式等。...我们可以通过Akka-http的raw-header来实现附加自定义消息的传递,这项功能可以通过Akka-http提供的raw-header筛选功能来实现。...") Future.successful(validUsers.contains(user.name)) } 下面是Credential-Directive的使用方法: authenticateBasicAsync...import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import scala.util._...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka._ import

84350

restapi(9)- caching, akka-http 缓存

userid=1234 这样的请求需要从数据库里读取用户信息数据及进行一些转换处理。这个请求调用得频率较高、数据库读取也比较耗时,是个实在的例子。...我们来看看如何实现缓存管理: 在akka-http里可以用两种方式来实现缓存管理:1、直接用cache工具,2、用akka-http提供的Directive: cache, alwaysCache 我们先看看如何直接使用...* function producing a `Future[V]`. */ def apply(key: K, genValue: () => Future[V]): Future[...V] Cache[K,V]是以K为键,一个()=> Future[V]为值的结构,也就是说我们需要把一个获取Future值的函数存在缓存里: pathPrefix("getuserinfo"...V]`. */ def getOrLoad(key: K, loadValue: K => Future[V]): Future[V] 跟着我们再试试用akka-http的Directive,

57210

Akka 指南 之「调度器」

PinnedDispatcher:这个调度器为每个使用它的 Actor 指定唯一的线程;即每个 Actor 拥有自己的线程池,池中只有一个线程。...这是因为每个 Actor 在使用PinnedDispatcher都有自己的线程池,而该池只有一个线程。...尝试寻找或构建Reactive API,以便阻塞最小化,或者将其转移到专用的调度器。通常在与现有库或系统集成,不可能避免阻塞 API,下面的解决方案解释了如何正确处理阻塞操作。...在my-blocking-dispatcher上运行阻塞操作,它使用线程(达到配置的限制)来处理这些操作。...在Future上执行阻塞调用,确保在任何时间点对此类调用的数量上限,提交无限数量的此类任务耗尽内存或线程限制。

1.8K21

Akka 指南 之「Akka 和 Java 内存模型」

Akka 和 Java 内存模型 使用 LightBend 平台(包括 Scala 和 Akka)的一个主要好处是简化了并发软件的编写过程。...如果 Actor 在处理消息更改其内部状态,并在稍后处理另一条消息访问该状态。重要的是要认识到,对于 Actor 模型,你不能保证同一线程将对不同的消息执行相同的 Actor。...这两个规则仅适用于同一个 Actor 实例,如果使用不同的 Actor,则这两个规则无效。 Futures 和 Java 存储模型 Future的“先于发生”调用任何注册到它的回调被执行之前。...我们强烈建议远离使用锁定的对象,因为它可能会导致性能问题,在最坏的情况下还会导致死锁。这就是同步的危险。 Actors 和共享可变状态 由于 Akka 在 JVM 上运行,所以仍然需要遵循一些规则。...关闭内部 Actor 状态并将其暴露给其他线程 import akka.actor.{ Actor, ActorRef } import akka.pattern.ask import akka.util.Timeout

95520

Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介

而push模式则会把数据推到输入端口后直接进入程序,但如果数据源头动作太快程序无法及时处理所有推送的数据就会造成所谓的数据溢出问题,遗失数据。...scalaz-stream的运算器是自备的函数式程序,特点是能很好的控制线程使用和进行并行运算。akka-stream的运算器是materializer。...属于数据元素的使用方,主要作用是消耗数据流中的元素。SinkShape是有一个输入端的数据流形状。...SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = toMat(sink)(Keep.right).run() 实际上是使用了...下面是本次的示范源代码: import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka._ import

1.6K60

Akka 指南 之「Actors」

, recipient)(配置项akka.actor.debug.unhandled设置为on,以便将其转换为实际的Debug消息)。...Ask: Send-And-Receive-Future ask模式涉及 Actor 和Future,因此它是作为一种使用模式而不是ActorRef上的一种方法提供的: import static akka.pattern.Patterns.ask...警告:当使用Future的回调,内部 Actor 需要小心避免关闭包含 Actor 的引用,即不要从回调中调用方法或访问封闭 Actor 的可变状态。...定时器和调度消息 通过直接使用「Scheduler」,可以消息安排在以后的时间点发送,但是在 Actor 中的定期或单个消息安排到自身使用对命名定时器(named timers)的支持更为方便和安全...要启用硬System.exit作为最终操作,可以配置: akka.coordinated-shutdown.exit-jvm = on 当使用Akka 集群」,当集群节点将自己视为Exiting

4K30

使用Akka HTTP构建微服务:CDC方法

生产者特定的依赖关系仅用于数据库支持,如您所见,我使用H2(在内存数据库中),但您可以轻松地将其替换为其他数据库支持。...),它将验证消费者(Consumer)是否按照协议中的规定进行要求。...com.fm.mylibrary.consumer.app.MyLibraryAppClient._ 当然,您可以使用其他方法,但请在选择保持一致,并避免在相同或类似项目中使用不同的方法/结构。...在主类中使用它非常容易; 只需将其添加为类特征,并将静态值替换为相应的常量即可: MyLibraryAppServer.scala package com.fm.mylibrary.producer.app...所以我们可以实现一个数据库迁移,它能够在启动应用任何必要的数据库更改来执行应用程序。

7.4K50

Akka-Cluster(5)- load-balancing with backoff-supervised stateless computation - 无状态任务集群节点均衡分配

细分的任务相互之间可以有关联或者各自为独立运算,使用akka-cluster可以把任务按照各节点运算资源的负载情况进行均匀的分配,从而达到资源的合理充分利用以实现运算效率最大化的目的。...decider.orElse(SupervisorStrategy.defaultDecider) ) ) ) 在这里要特别注明一下Backoff.OnFailure和Backoff.OnStop的使用场景和作用...发生异常重新构建新的实例并启动。 很明显,通常我们需要在运算发生异常重新启动运算,所以用OnFailure才是正确的选择。...下面是我之前介绍关于BackoffSupervisor用的一个例子的代码示范: package backoffSupervisorDemo import akka.actor._ import akka.pattern...这样Backend在实例化或者因为某种原因重启的话,特别是换了另一个JVM可以正确的构建MongoClient。

1.5K20
领券