首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何在scala中并发执行此操作

如何在scala中并发执行此操作
EN

Stack Overflow用户
提问于 2018-07-28 03:08:14
回答 2查看 121关注 0票数 0

所以我有这段代码

dbs.foreach({
  var map = scala.collection.mutable.Map[String, mutable.MutableList[String]]()
  db =>
    val resultList = getTables(hive, db)
    map+=(db -> resultList)
})

这样做是循环遍历数据库列表,为每个数据库执行show tables in db调用,然后将数据库->表添加到映射中。既然有大约5秒的等待时间来等待配置单元查询返回,那么如何同时完成这项工作呢?

更新代码--

def getAllTablesConcurrent(hive: JdbcHive, dbs: mutable.MutableList[String]): Map[String, mutable.MutableList[String]] = {
  implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
  val futures = dbs.map {
    db =>
        Future(db, getTables(hive, db))
    }
  val map = Await.result( Future.sequence(futures), Duration(10, TimeUnit.SECONDS) ).toMap
  map
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-07-28 04:59:23

如果你想要更多的控制(你想等待多少时间,你想使用多少线程,如果你所有的线程都很忙会发生什么,等等)你可以使用ThreadPollExecutor和Future

  implicit val context:ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val dbs = List("db1", "db2", "db3")

  val futures = dbs.map {
   name => Future(name, getables(hive, name))
  }

  val result = Await.result( Future.sequence(futures), Duration(TIMEOUT, TimeUnit.MILLISECONDS) ).toMap

请记住,不要每次需要时都创建一个新的ExecutionContext

票数 1
EN

Stack Overflow用户

发布于 2018-07-28 03:30:49

您可以在任何Scala集合上使用.par来并行执行下一次转换(使用默认的并行性,这取决于内核的数量)。

此外-更容易和更干净地map到一个(不可变的)映射中,而不是更新一个可变的映射。

val result = dbs.par.map(db => db -> getTables(hive, db)).toMap

要更好地控制所使用的并发线程数,请参见https://docs.scala-lang.org/overviews/parallel-collections/configuration.html

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51564138

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档