首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Akka调度程序:生产中的奇怪行为(消息不触发)

Akka调度程序:生产中的奇怪行为(消息不触发)
EN

Stack Overflow用户
提问于 2017-12-13 15:57:54
回答 2查看 374关注 0票数 0

我正在开发scala + akka应用程序,作为一个更大的应用程序的一部分。该应用程序的目的是调用外部服务和SQL数据库(使用JDBC),进行一些处理,并在反复的基础上返回解析的结果。该应用程序使用akka集群,这样就可以进行水平缩放。

它应该如何工作

我正在集群上创建一个**单例参与者,负责向指令处理程序参与者池发送指令。我正在接收来自Redis发布/子频道的事件,该频道声明哪些数据源应该刷新,以及刷新的频率。此SourceScheduler参与者将指令与间隔一起存储在内部数组中。

然后,我使用akka Scheduler,每秒钟执行一个滴答函数。此函数过滤数组以确定需要执行哪些指令,并向指令处理程序池发送消息。池中的路由器执行指令并通过Redis Pub/Sub发出结果。

The issue

在我的机器上(Ryzen 7+16 my + ArchLinux),一切运行良好,我们可以轻松地处理每秒2500个数据库调用。但一旦投入生产,我无法让它处理400多个请求/秒。

SourceScheduler并不是每秒钟都在滴答,消息就会卡在邮箱里。此外,该应用程序使用更多的CPU资源,使用更多的RAM (在生产中使用1.3GB,在我的机器上使用~350 my )

该生产应用程序运行在一个基于JRE-8的高山码头容器上的牧场,在MS Azure服务器上。

我理解集群中的单例参与者可能是一个瓶颈,但是由于它只向其他参与者转发消息,所以我不知道它如何阻止。

我试过什么

  • 我使用Tomcat作为SQL查询的连接池管理器。我确信我不会泄漏任何连接,因为我记录从池中借用的每个连接以及返回到它的每个连接
  • 阻塞操作(如JDBC查询)都是在单独的dispatcher上执行的,dispatcher是一个固定的线程池执行器,有500个线程,因此所有其他参与者都应该正常运行。
  • 我还为SourceScheduler演员提供了一个专用的固定调度程序,因此它应该在自己的线程上运行
  • 我尝试过在集群中运行这个应用程序,它有3个节点,没有任何性能改进。由于SourceScheduler是一个单例,运行多个节点并不能解决这个问题。
  • 我已经在我同事的机器上试用了这个应用程序。就像一种魅力。我只是遇到了生产服务器的问题
  • 我尝试将生产服务器升级到Azure上最强大的可用服务器(16核,2.3ghz),没有明显的变化。

和其他人一样,他们的本地机器和生产服务器?之间也有这样的区别。

编辑 SourceScheduler.scala

代码语言:javascript
运行
复制
class SourceScheduler extends Actor with ActorLogging with Timers {
  case object Tick
  case object SchedulerReport
  import context.dispatcher

  val instructionHandlerPool = context.actorOf(
    ClusterRouterGroup(
      RoundRobinGroup(Nil),
      ClusterRouterGroupSettings(
        totalInstances = 10,
        routeesPaths = List("/user/instructionHandler"),
        allowLocalRoutees = true
      )
    ).props(),
    name = "instructionHandlerRouter")

  var ticks: Int = 0
  var refreshedSources: Int = 0
  val maxTicks: Int = Int.MaxValue - 1

  var scheduledSources = Array[(String, Int, String)]()

  override def preStart(): Unit = {
    log.info("Starting Scheduler")
  }

  def refreshSource(hash: String) = {
    instructionHandlerPool ! Instruction(hash)
    refreshedSources += 1
  }

  // Get sources that neeed to be refreshed
  def getEligibleSources(sources: Seq[(String, Int, String)], tick: Int) = {
    sources.groupBy(_._1).mapValues(_.toList.minBy(_._2)).values.filter(tick * 1000 % _._2 == 0).map(_._1)
  }

  def tick(): Unit = {
    ticks += 1
    log.debug("Scheduler TICK {}", ticks)
    val eligibleSources = getEligibleSources(scheduledSources, ticks)
    val chunks = eligibleSources.grouped(ConnectionPoolManager.connectionPoolSize).zipWithIndex.toList
    log.debug("Scheduling {} sources in {} chunks", eligibleSources.size, chunks.size)
    chunks.foreach({
      case(sources, index) =>
        after((index * 25 + 5) milliseconds, context.system.scheduler)(Future.successful {
          sources.foreach(refreshSource)
        })
    })
    if(ticks >= maxTicks) ticks = 0
  }
  timers.startPeriodicTimer("schedulerTickTimer", Tick, 990 milliseconds)
  timers.startPeriodicTimer("schedulerReportTimer", SchedulerReport, 10 seconds)

  def receive: Receive = {
    case AttachSource(hash, interval, socketId) =>
      scheduledSources.synchronized {
        scheduledSources = scheduledSources :+ ((hash, interval, socketId))
      }
    case DetachSource(socketId) =>
      scheduledSources.synchronized {
        scheduledSources = scheduledSources.filterNot(_._3 == socketId)
      }
    case SchedulerReport =>
      log.info("{} sources were scheduled since last report", refreshedSources)
      refreshedSources = 0
    case Tick => tick()
    case _ =>
  }
}

每个源都由一个包含执行所需的所有数据的散列(例如数据库主机)、刷新间隔以及请求它的客户端的唯一id来确定,这样我们就可以在客户端断开连接时停止刷新。每秒钟,我们都会检查源是否需要通过应用带有滴答计数器当前值的模块来刷新。为了避免连接池饥饿,我们以较小的块刷新源,问题是在小负载(~300 rq/s)下,刻度函数不再每秒钟执行一次。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-12-15 13:54:07

结果发现问题出在兰彻身上。我们做了几次测试,应用程序在机器上运行良好,在对接上运行良好,但在使用Rancher作为策划者时却不是这样。我不知道为什么,但因为它与Akka无关,所以我要结束这个问题。谢谢大家的帮助。

票数 1
EN

Stack Overflow用户

发布于 2017-12-13 16:26:32

也许瓶颈是在网络延迟上?在您的计算机中,所有组件都是并行运行的,通信应该没有延迟,但是在集群中,如果您从一台计算机到另一台计算机进行大量数据库调用,则网络延迟可能是显而易见的。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47797315

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档