我有一个方法,其中有多个对db的调用。由于我没有实现任何并发处理,第二个db调用必须等到第一个db调用完成,第三个必须等到第二个数据库调用完成,依此类推。
所有db调用都是相互独立的。我希望这样做,使所有DB调用同时运行。
我对Akka框架很陌生。
有谁能帮我拿一些小样本或参考资料就可以了。应用程序是用Scala开发的。
发布于 2018-06-13 13:38:22
对于给定的示例需要,有三种主要的方法可以实现并发。
期货
对于问题中提出的特定用例,我会在任何akka构造之前推荐期货。
假设我们被赋予数据库调用作为函数:
type Data = ???
val dbcall1 : () => Data = ???
val dbcall2 : () => Data = ???
val dbcall3 : () => Data = ???并发性可以很容易地应用,然后可以使用“期货”收集结果:
val f1 = Future { dbcall1() }
val f2 = Future { dbcall2() }
val f3 = Future { dbcall3() }
for {
v1 <- f1
v2 <- f2
v3 <- f3
} {
println(s"All data collected: ${v1}, ${v2}, ${v3}")
}Akka Streams
有类似的堆叠回答,它演示了如何使用akka-stream库进行并发数据库查询。
Akka演员
还可以编写一个Actor来执行查询:
object MakeQuery
class DBActor(dbCall : () => Data) extends Actor {
override def receive = {
case _ : MakeQuery => sender ! dbCall()
}
}
val dbcall1ActorRef = system.actorOf(Props(classOf[DBActor], dbcall1)) 但是,在这个用例中,Actors不太有用,因为您仍然需要一起收集所有数据。
您可以使用与“期货”部分相同的技术:
val f1 : Future[Data] = (dbcall1ActorRef ? MakeQuery).mapTo[Data]
for {
v1 <- f1
...或者,您必须通过构造函数手工将Actors连接到一起,并处理等待其他Actor的所有回调逻辑:
class WaitingDBActor(dbCall : () => Data, previousActor : ActorRef) {
override def receive = {
case _ : MakeQuery => previousActor forward MakeQuery
case previousData : Data => sender ! (dbCall(), previousData)
}
}发布于 2019-01-02 23:29:50
如果您想查询数据库,您应该使用类似于圆滑的东西,这是一个用于Scala的现代数据库查询和访问库。
浮华的快速例子:
case class User(id: Option[Int], first: String, last: String)
class Users(tag: Tag) extends Table[User](tag, "users") {
def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
def first = column[String]("first")
def last = column[String]("last")
def * = (id.?, first, last) <> (User.tupled, User.unapply)
}
val users = TableQuery[Users]然后,您需要为db创建配置:
mydb = {
dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
properties = {
databaseName = "mydb"
user = "myuser"
password = "secret"
}
numThreads = 10
}在代码中加载配置:
val db = Database.forConfig("mydb")然后使用db.run方法运行查询,该方法为您提供未来的结果,例如,您可以通过调用方法结果获得所有行。
val allRows: Future[Seq[User]] = db.run(users.result)此查询在不阻塞当前线程的情况下运行。
如果您的任务需要很长时间来执行或调用另一个服务,您应该使用期货。
这方面的例子是对外部服务的简单HTTP调用。您可以在这里中找到示例
如果您的任务需要很长时间才能执行,那么您就必须保持可变的状态,在这种情况下,最好的选择是使用Akka Actors将您的状态封装在一个参与者中,它解决了并发性和线程安全问题,就像possible.Example一样简单:
import akka.actor.Actor
import scala.concurrent.Future
case class RegisterEndpoint(endpoint: String)
case class NewUpdate(update: String)
class UpdateConsumer extends Actor {
val endpoints = scala.collection.mutable.Set.empty[String]
override def receive: Receive = {
case RegisterEndpoint(endpoint) =>
endpoints += endpoint
case NewUpdate(update) =>
endpoints.foreach { endpoint =>
deliverUpdate(endpoint, update)
}
}
def deliverUpdate(endpoint: String, update: String): Future[Unit] = {
Future.successful(Unit)
}
}如果您想要处理大量的实时数据,或者websocket连接,处理随着时间增长的CSV文件,.等等,最好的选择是阿克卡流。例如,使用Alpakka:Alpakka kafka连接器从kafka主题读取数据
https://stackoverflow.com/questions/50822215
复制相似问题