前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spillable StateBackend 之 HeapStatusMonitor 解析

Spillable StateBackend 之 HeapStatusMonitor 解析

原创
作者头像
KyleMeow
修改2021-09-29 20:46:50
1.2K1
修改2021-09-29 20:46:50
举报

背景介绍

Flink 社区的 Spillable Backend 特性,社区经过了大半年的开发,目前已经放出了预览版可供体验。

简而言之,这个 StateBackend 它可以实现将 Flink 作业运行时的状态主要还是存留在内存里(如果不超过内存本身的限制的话),此时行为和 Flink 目前的 HeapStateBackend 类似,可以获取最佳的性能;系统会定期自动检测当前的指标值(内存负载、GC 时间等),发现内存已经大概率承受不住之时,启动 spill 过程,将不常访问的大对象从内存中移到磁盘中。而当负载下降时,可以把这些状态 load 回内存。

总体来看,它的实现思路类似于 swap(虚拟内存),但是针对 Flink 本身的特点进行了特别设计。本专栏之前有一篇文章《Spill-able Heap Keyed State Backend 设计概览》介绍过这个特性,这次我们根据最新的代码,对它的实现进行更细致的分析和评价。

HeapStatusMonitor 的配置项

正如前文所述,这个模块主要承担着运行时监测堆内存状态的职责,它定期根据策略检测系统资源情况,然后将当前资源用量写入自己的变量中,等待别人读取,因而它是生产者。

它的检测周期由配置参数state.backend.spillable.heap-status.check-interval来决定,默认是 1 分钟。如果从经验上看,作业内存负载变化较为剧烈,推荐将这个值设置短一些,避免突发的用量增加而导致整个进程 OOM;反之,如果内存负载变化缓慢而均匀,则可以增大设置,避免频繁检测的开销。

还有一个参数 state.backend.spillable.gc-time.threshold,决定了 GC 时间超过多长时,发送 spill 信号(状态移到磁盘),默认值是 2s,即发现 GC 时间超过 2s 时,触发 spill。

另外,还有一个参数 state.backend.spillable.resource-check.interval 比较令人迷惑,它实际上指的是状态表(StateTable)在存取的数据时候(put、get 操作),从 HeapStatusMonitor 读取资源检测结果的最小周期,因此这是消费者的消费周期,与 HeapStatusMonitor 自身无关。由于本文重点介绍的是决策的生产者 HeapStatusMonitor,这些参数暂时略过。

构造方法解析

HeapStatusMonitor 类的构造方法里有十几个参数,分别作用如下:

- checkIntervalInMs:堆内存检测周期,即我们之前介绍的 state.backend.spillable.heap-status.check-interval 参数。

- memoryMxBean:JVM 自带的 MemoryMXBean 对象,可以从中获取当前的堆内存用量、堆内存最大值等。

- maxMemory:从上述 Bean 中获取的堆内存最大值,用来作为一个常量。

- garbageCollectorMXBeans:JVM 自带的描述 GC 统计信息的 MXBean 对象列表(不止一个)。

- resultIdGenerator:为每次检测的结果生成一个自增的 ID。因为可能并发访问,这里需要使用 AtomicLong 对象。

- monitorResult:描述单次的检测结果。MonitorResult 对象包含了当前时间戳、ID、堆内存用量、GC 时间等关键信息。当然,目前这些指标还是太少,生产环境还是需要更多决策项。

- lastGcTime:上次 GC 的时间值

- lastGcCount:上次 GC 的统计数

- shutdownHook:Flink 里常见的用法,注册一个 JVM 的 shutdown hook,这样在进程关闭时,可以打印相关的日志,并设置 isShutdown 环境变量。

- checkExecutor:创建一个周期执行器,并设置相关的取消策略。

- checkFuture:向上述周期执行器提交一个周期为 checkIntervalInMs,会定时运行本类的 runCheck()方法,来检查堆内存的情况,并生成对应的 monitorResult。

runCheck 方法

这个方法是周期检查的核心,也非常简单,我们来看一下:

private void runCheck() {
    long timestamp = System.currentTimeMillis();
    long id = resultIdGenerator.getAndIncrement();
    this.monitorResult = new MonitorResult(timestamp, id, memoryMXBean.getHeapMemoryUsage(), getGarbageCollectionTime());
    if (LOG.isDebugEnabled()) {
        LOG.debug("Check memory status, {}", monitorResult.toString());
    }
}

可以看到,这个方法就是定期将当前的堆内存用量,以及最近一次 GC 的平均时间保存在本实例的 monitorResult 对象中,以备决策者读取。堆内存用量的获取非常直白,即直接获取 memoryMXBean 的 getHeapMemoryUsage() 方法即可,但是 GC 时间的获取还是要费一番功夫。

getGarbageCollectionTime 方法

这个方法用来获取最近 GC 的平均时间,代码如下:

private long getGarbageCollectionTime() {
    long count = 0;
    long timeMillis = 0;
    for (GarbageCollectorMXBean gcBean : garbageCollectorMXBeans) {
        long c = gcBean.getCollectionCount();
        long t = gcBean.getCollectionTime();
        count += c;
        timeMillis += t;
    }

    if (count == lastGcCount) {
        return 0;
    }

    long gcCountIncrement = count - lastGcCount;
    long averageGcTime = (timeMillis - lastGcTime) / gcCountIncrement;

    lastGcCount = count;
    lastGcTime = timeMillis;

    return averageGcTime;
}

从代码里可以看到,这里是通过一个 for 循环,遍历 garbageCollectorMXBeans 列表里的所有 GC 的 MXBean,然后逐个读取当前的 GC 次数和时间,加到变量里。然后将得到的总次数和总时间,分别减去上次记录的值(lastGcCount 和 lastGcTime),然后进行相除,就可以得到本次检测时的 GC 平均时间了。

总结

在 Spillable StateBackend 的设计中,HeapStatusMonitor 属于非常简单但是也非常核心的功能类,相当于整个系统决策的信号标。

但时,我们也注意到,目前它的作用只是定时的检测堆内存用量、平均 GC 时间,并没有涉及到更多的指标,例如方差、增量比率、复合指标等等。同时,由于 GC 算法的不同,这里得到的平均 GC 时间很可能没有意义,例如最新的 ZGC 宣称可以将停顿时间降低到 2ms 以下。此时,单纯检测 GC 时间,并不能很好地得出系统繁忙与否的结论。

因此,目前这个预览版仍然不适合作为线上生产环境使用。我们近期会持续追踪,并补充生产环境的一些实践经验和改进项,这些都会在接下来的系列文章中得到阐述。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景介绍
  • HeapStatusMonitor 的配置项
  • 构造方法解析
  • runCheck 方法
  • getGarbageCollectionTime 方法
  • 总结
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档