首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在akka中实现并发处理?

如何在akka中实现并发处理?
EN

Stack Overflow用户
提问于 2018-06-12 16:59:04
回答 2查看 312关注 0票数 0

我有一个方法,其中有多个对db的调用。由于我没有实现任何并发处理,第二个db调用必须等到第一个db调用完成,第三个必须等到第二个数据库调用完成,依此类推。

所有db调用都是相互独立的。我希望这样做,使所有DB调用同时运行。

我对Akka框架很陌生。

有谁能帮我拿一些小样本或参考资料就可以了。应用程序是用Scala开发的。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-06-13 13:38:22

对于给定的示例需要,有三种主要的方法可以实现并发。

期货

对于问题中提出的特定用例,我会在任何akka构造之前推荐期货。

假设我们被赋予数据库调用作为函数:

代码语言:javascript
运行
复制
type Data = ???

val dbcall1 : () => Data = ???

val dbcall2 : () => Data = ???

val dbcall3 : () => Data = ???

并发性可以很容易地应用,然后可以使用“期货”收集结果:

代码语言:javascript
运行
复制
val f1 = Future { dbcall1() }
val f2 = Future { dbcall2() }
val f3 = Future { dbcall3() }

for {
  v1 <- f1
  v2 <- f2
  v3 <- f3
} {
  println(s"All data collected: ${v1}, ${v2}, ${v3}")
}

Akka Streams

类似的堆叠回答,它演示了如何使用akka-stream库进行并发数据库查询。

Akka演员

还可以编写一个Actor来执行查询:

代码语言:javascript
运行
复制
object MakeQuery

class DBActor(dbCall : () => Data) extends Actor {
  override def receive = {
    case _ : MakeQuery => sender ! dbCall()
  }
}

val dbcall1ActorRef = system.actorOf(Props(classOf[DBActor], dbcall1)) 

但是,在这个用例中,Actors不太有用,因为您仍然需要一起收集所有数据。

您可以使用与“期货”部分相同的技术:

代码语言:javascript
运行
复制
val f1 : Future[Data] = (dbcall1ActorRef ? MakeQuery).mapTo[Data]

for {
  v1 <- f1
  ...

或者,您必须通过构造函数手工将Actors连接到一起,并处理等待其他Actor的所有回调逻辑:

代码语言:javascript
运行
复制
class WaitingDBActor(dbCall : () => Data, previousActor : ActorRef) {
  override def receive = {
    case _ : MakeQuery => previousActor forward MakeQuery

    case previousData : Data => sender ! (dbCall(), previousData)
  }
}
票数 1
EN

Stack Overflow用户

发布于 2019-01-02 23:29:50

如果您想查询数据库,您应该使用类似于圆滑的东西,这是一个用于Scala的现代数据库查询和访问库。

浮华的快速例子:

代码语言:javascript
运行
复制
case class User(id: Option[Int], first: String, last: String)

class Users(tag: Tag) extends Table[User](tag, "users") {
  def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
  def first = column[String]("first")
  def last = column[String]("last")
  def * = (id.?, first, last) <> (User.tupled, User.unapply)
}
val users = TableQuery[Users]

然后,您需要为db创建配置:

代码语言:javascript
运行
复制
mydb = {
  dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
  properties = {
    databaseName = "mydb"
    user = "myuser"
    password = "secret"
  }
  numThreads = 10
}

在代码中加载配置:

代码语言:javascript
运行
复制
val db = Database.forConfig("mydb")

然后使用db.run方法运行查询,该方法为您提供未来的结果,例如,您可以通过调用方法结果获得所有行。

代码语言:javascript
运行
复制
val allRows: Future[Seq[User]] = db.run(users.result)

此查询在不阻塞当前线程的情况下运行。

如果您的任务需要很长时间来执行或调用另一个服务,您应该使用期货。

这方面的例子是对外部服务的简单HTTP调用。您可以在这里中找到示例

如果您的任务需要很长时间才能执行,那么您就必须保持可变的状态,在这种情况下,最好的选择是使用Akka Actors将您的状态封装在一个参与者中,它解决了并发性和线程安全问题,就像possible.Example一样简单:

代码语言:javascript
运行
复制
import akka.actor.Actor

import scala.concurrent.Future

case class RegisterEndpoint(endpoint: String)

case class NewUpdate(update: String)

class UpdateConsumer extends Actor {
  val endpoints = scala.collection.mutable.Set.empty[String]

  override def receive: Receive = {

    case RegisterEndpoint(endpoint) =>
      endpoints += endpoint

    case NewUpdate(update) =>
      endpoints.foreach { endpoint =>
        deliverUpdate(endpoint, update)
      }
  }

  def deliverUpdate(endpoint: String, update: String): Future[Unit] = {
    Future.successful(Unit)
  }

}

如果您想要处理大量的实时数据,或者websocket连接,处理随着时间增长的CSV文件,.等等,最好的选择是阿克卡流。例如,使用Alpakka:Alpakka kafka连接器从kafka主题读取数据

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50822215

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档