我正在开发scala + akka应用程序,作为一个更大的应用程序的一部分。该应用程序的目的是调用外部服务和SQL数据库(使用JDBC),进行一些处理,并在反复的基础上返回解析的结果。该应用程序使用akka集群,这样就可以进行水平缩放。
它应该如何工作
我正在集群上创建一个**单例参与者,负责向指令处理程序参与者池发送指令。我正在接收来自Redis发布/子频道的事件,该频道声明哪些数据源应该刷新,以及刷新的频率。此SourceScheduler参与者将指令与间隔一起存储在内部数组中。
然后,我使用akka Scheduler,每秒钟执行一个滴答函数。此函数过滤数组以确定需要执行哪些指令,并向指令处理程序池发送消息。池中的路由器执行指令并通过Redis Pub/Sub发出结果。
The issue
在我的机器上(Ryzen 7+16 my + ArchLinux),一切运行良好,我们可以轻松地处理每秒2500个数据库调用。但一旦投入生产,我无法让它处理400多个请求/秒。
SourceScheduler并不是每秒钟都在滴答,消息就会卡在邮箱里。此外,该应用程序使用更多的CPU资源,使用更多的RAM (在生产中使用1.3GB,在我的机器上使用~350 my )
该生产应用程序运行在一个基于JRE-8的高山码头容器上的牧场,在MS Azure服务器上。
我理解集群中的单例参与者可能是一个瓶颈,但是由于它只向其他参与者转发消息,所以我不知道它如何阻止。
我试过什么
和其他人一样,他们的本地机器和生产服务器?之间也有这样的区别。
编辑 SourceScheduler.scala
class SourceScheduler extends Actor with ActorLogging with Timers {
case object Tick
case object SchedulerReport
import context.dispatcher
val instructionHandlerPool = context.actorOf(
ClusterRouterGroup(
RoundRobinGroup(Nil),
ClusterRouterGroupSettings(
totalInstances = 10,
routeesPaths = List("/user/instructionHandler"),
allowLocalRoutees = true
)
).props(),
name = "instructionHandlerRouter")
var ticks: Int = 0
var refreshedSources: Int = 0
val maxTicks: Int = Int.MaxValue - 1
var scheduledSources = Array[(String, Int, String)]()
override def preStart(): Unit = {
log.info("Starting Scheduler")
}
def refreshSource(hash: String) = {
instructionHandlerPool ! Instruction(hash)
refreshedSources += 1
}
// Get sources that neeed to be refreshed
def getEligibleSources(sources: Seq[(String, Int, String)], tick: Int) = {
sources.groupBy(_._1).mapValues(_.toList.minBy(_._2)).values.filter(tick * 1000 % _._2 == 0).map(_._1)
}
def tick(): Unit = {
ticks += 1
log.debug("Scheduler TICK {}", ticks)
val eligibleSources = getEligibleSources(scheduledSources, ticks)
val chunks = eligibleSources.grouped(ConnectionPoolManager.connectionPoolSize).zipWithIndex.toList
log.debug("Scheduling {} sources in {} chunks", eligibleSources.size, chunks.size)
chunks.foreach({
case(sources, index) =>
after((index * 25 + 5) milliseconds, context.system.scheduler)(Future.successful {
sources.foreach(refreshSource)
})
})
if(ticks >= maxTicks) ticks = 0
}
timers.startPeriodicTimer("schedulerTickTimer", Tick, 990 milliseconds)
timers.startPeriodicTimer("schedulerReportTimer", SchedulerReport, 10 seconds)
def receive: Receive = {
case AttachSource(hash, interval, socketId) =>
scheduledSources.synchronized {
scheduledSources = scheduledSources :+ ((hash, interval, socketId))
}
case DetachSource(socketId) =>
scheduledSources.synchronized {
scheduledSources = scheduledSources.filterNot(_._3 == socketId)
}
case SchedulerReport =>
log.info("{} sources were scheduled since last report", refreshedSources)
refreshedSources = 0
case Tick => tick()
case _ =>
}
}每个源都由一个包含执行所需的所有数据的散列(例如数据库主机)、刷新间隔以及请求它的客户端的唯一id来确定,这样我们就可以在客户端断开连接时停止刷新。每秒钟,我们都会检查源是否需要通过应用带有滴答计数器当前值的模块来刷新。为了避免连接池饥饿,我们以较小的块刷新源,问题是在小负载(~300 rq/s)下,刻度函数不再每秒钟执行一次。
发布于 2017-12-15 13:54:07
结果发现问题出在兰彻身上。我们做了几次测试,应用程序在机器上运行良好,在对接上运行良好,但在使用Rancher作为策划者时却不是这样。我不知道为什么,但因为它与Akka无关,所以我要结束这个问题。谢谢大家的帮助。
发布于 2017-12-13 16:26:32
也许瓶颈是在网络延迟上?在您的计算机中,所有组件都是并行运行的,通信应该没有延迟,但是在集群中,如果您从一台计算机到另一台计算机进行大量数据库调用,则网络延迟可能是显而易见的。
https://stackoverflow.com/questions/47797315
复制相似问题