spark 2.3 导致driver OOM的一个SparkPlanGraphWrapper源码的bug

背景

长话短说,我们部门一个同事找到我,说他的spark 2.3 structured streaming程序频繁报OOM,从来没有坚持过超过三四天的,叫帮看一下。 这种事情一般我是不愿意看的,因为大部分情况下spark oom就那么几种可能:

  • 数据量拉太大,executor内存爆了;
  • shuffle过程中数据量太大,shuffle数太少,内存又爆了;
  • 闲着蛋疼调用collect之类的方法,把数据往dirver上一聚合,driver内存爆了
  • 闲着蛋疼又调用了一下persist还把结果存内存,还是爆了

这些问题基本都可以通过限制每次拉取的数据/加大内存/该分片分片解决。

但这次这个我看了一下,还真不是上面这些日常问题,值得记录一下。

过程

过了一遍程序和数据,肉眼感觉没毛病,这些地方都没问题,只好祭出大杀器:

-XX:+HeapDumpOnOutOfMemoryError

顺便还加上了printGC全家桶。

程序再次挂掉后,先看了一眼gc日志,发现老年代内存使用量持续增大,fgc前后几乎无变化,那么就不是数据太大装不下,应该是内存泄漏没跑了,再看dump文件。

拿MAT打开文件,很容易就定位到了内存泄漏点,如下图所示:

直奔源码:

public class InMemoryStore implements KVStore {

  private Object metadata;
  //这里就是那个5个多g大的map
  private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();
  
  ......
}

没毛病,能对上。所以问题应该比较清晰了,spark应该是每次执行batch时在什么地方往这个map里加了很多数据,但是又忘记了移除掉已经过期的部分,所以导致gc无效了。

那接下来要问的就是,什么地方做了put操作而又没有remove呢?我们再来看看下这个5个g的InmemoryStore的引用到底被谁持有:

图里很明显,接下来我们要看ElementTrackingStore的实现,我顺便把这个类的说明也放在这里:

/**
 * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering
 * actions once they reach a threshold. This allows writers, for example, to control how much data
 * is stored by potentially deleting old data as new data is added.
 *
 * This store is used when populating data either from a live UI or an event log. On top of firing
 * triggers when elements reach a certain threshold, it provides two extra bits of functionality:
 *
 * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can
 *   be configured to run on the calling thread when more determinism is desired (e.g. unit tests).
 * - a generic flush mechanism so that listeners can be notified about when they should flush
 *   internal state to the store (e.g. after the SHS finishes parsing an event log).
 *
 * The configured triggers are run on a separate thread by default; they can be forced to run on
 * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`.
 */
 private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore {

  import config._

  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
  private val flushTriggers = new ListBuffer[() => Unit]()
  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
    ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")
  } else {
    MoreExecutors.sameThreadExecutor()
  }

  @volatile private var stopped = false

  /**
   * Register a trigger that will be fired once the number of elements of a given type reaches
   * the given threshold.
   *
   * @param klass The type to monitor.
   * @param threshold The number of elements that should trigger the action.
   * @param action Action to run when the threshold is reached; takes as a parameter the number
   *               of elements of the registered type currently known to be in the store.
   */
  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = {
    val existing = triggers.getOrElse(klass, Seq())
    triggers(klass) = existing :+ Trigger(threshold, action)
  }
  
  ......
 }

这个类的方法里,我们需要关注的就是这个addTrigger方法,其注释也写的很明白,就是用来当保存的对象达到一定数目后触发的操作。

这时候心里就猜一下是不是什么地方的trigger写错了,所以我们再看看这个方法都在哪里使用了:

考虑到我们溢出的对象都是SparkPlanGraphNode,所以先看最下面我选中的蓝色那一行的代码:

kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count =>
    cleanupExecutions(count)
  }
  
private def cleanupExecutions(count: Long): Unit = {
    val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
    if (countToDelete <= 0) {
      return
    }

    val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
    //出错的就是这一行
    toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
  }

看到了吧,这里在触发trigger的时候,压根没有删除SparkPlanGraphWrapper的相关逻辑,难怪会报oom!

结果

按理说到这里就差不多了,这个OOM的锅还真不能让同事背,的确是spark的一个bug。但是我很好奇,这么大一个问题,spark社区难道就没有动静吗?所以我就去社区搜了一下,发现了这个: Memory leak of SparkPlanGraphWrapper in sparkUI 所以确认了,这个地方确实是spark2.3的一个隐藏bug,在2.3.1和2.4.0中被修复了,有兴趣的童鞋可以点进去看看。


全文完。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Golang语言社区

Golang使用pprof监控性能及GC调优

作者:峰云就她了 链接:http://xiaorui.cc/?p=3000 來源:个人博客

3683
来自专栏灯塔大数据

技术| Python的从零开始系列连载(三十)

为了解答大家学习Python时遇到各种常见问题,小灯塔特地整理了一系列从零开始的入门到熟练的系列连载,每周五准时推出,欢迎大家学积极学习转载~

912
来自专栏Play & Scala 技术分享

Scala之美 - Future & map & flatMap

3718
来自专栏java一日一条

深入理解 Java 中的 try-with-resource

众所周知,所有被打开的系统资源,比如流、文件或者Socket连接等,都需要被开发者手动关闭,否则随着程序的不断运行,资源泄露将会累积成重大的生产事故。

1562
来自专栏瓜大三哥

​UVM(九)之sequencej机制续1

UVM(九)之sequencej机制续1 当一个sequence启动起来之后,UVM会自动执行sequence的body任务,所以要产生各种和杨的激励,就要写好...

25210
来自专栏Golang语言社区

使用Go语言框架进行web开发笔记

前言 关于golang的web开发有不少框架,例如 martini, gin, revel,gorilla等。 之前玩过revel,感觉封装的太多了,作为一个小...

4167
来自专栏解决发现

CPU占用率100%的解决方法

图:优化前(我的电脑是四核cpu,所以单线程无限无阻塞循环占用率不会达到100%)

4930
来自专栏小白安全

绕过软WAF攻略

现在软waf较为多,就在今年夏天苦逼挖洞的日子里经常遇到360主机卫士,安全狗,云锁之类的软waf进行拦截,经常碰到如下拦截提示: ? ? ? 看到以上...

7225
来自专栏Java技术栈

Java 虚拟机对锁优化所做的努力

作为一款公用平台,JDK 本身也为并发程序的性能绞尽脑汁,在 JDK 内部也想尽一切办法提供并发时的系统吞吐量。这里,我将向大家简单介绍几种 JDK 内部的 "...

752
来自专栏刺客博客

PHP实现利用API获取IP所在城市

9554

扫码关注云+社区

领取腾讯云代金券