专栏首页腾讯云流计算Spillable StateBackend 之 HeapStatusMonitor 解析
原创

Spillable StateBackend 之 HeapStatusMonitor 解析

背景介绍

Flink 社区的 Spillable Backend 特性,社区经过了大半年的开发,目前已经放出了预览版可供体验。

简而言之,这个 StateBackend 它可以实现将 Flink 作业运行时的状态主要还是存留在内存里(如果不超过内存本身的限制的话),此时行为和 Flink 目前的 HeapStateBackend 类似,可以获取最佳的性能;系统会定期自动检测当前的指标值(内存负载、GC 时间等),发现内存已经大概率承受不住之时,启动 spill 过程,将不常访问的大对象从内存中移到磁盘中。而当负载下降时,可以把这些状态 load 回内存。

总体来看,它的实现思路类似于 swap(虚拟内存),但是针对 Flink 本身的特点进行了特别设计。本专栏之前有一篇文章《Spill-able Heap Keyed State Backend 设计概览》介绍过这个特性,这次我们根据最新的代码,对它的实现进行更细致的分析和评价。

HeapStatusMonitor 的配置项

正如前文所述,这个模块主要承担着运行时监测堆内存状态的职责,它定期根据策略检测系统资源情况,然后将当前资源用量写入自己的变量中,等待别人读取,因而它是生产者。

它的检测周期由配置参数state.backend.spillable.heap-status.check-interval来决定,默认是 1 分钟。如果从经验上看,作业内存负载变化较为剧烈,推荐将这个值设置短一些,避免突发的用量增加而导致整个进程 OOM;反之,如果内存负载变化缓慢而均匀,则可以增大设置,避免频繁检测的开销。

还有一个参数 state.backend.spillable.gc-time.threshold,决定了 GC 时间超过多长时,发送 spill 信号(状态移到磁盘),默认值是 2s,即发现 GC 时间超过 2s 时,触发 spill。

另外,还有一个参数 state.backend.spillable.resource-check.interval 比较令人迷惑,它实际上指的是状态表(StateTable)在存取的数据时候(put、get 操作),从 HeapStatusMonitor 读取资源检测结果的最小周期,因此这是消费者的消费周期,与 HeapStatusMonitor 自身无关。由于本文重点介绍的是决策的生产者 HeapStatusMonitor,这些参数暂时略过。

构造方法解析

HeapStatusMonitor 类的构造方法里有十几个参数,分别作用如下:

- checkIntervalInMs:堆内存检测周期,即我们之前介绍的 state.backend.spillable.heap-status.check-interval 参数。

- memoryMxBean:JVM 自带的 MemoryMXBean 对象,可以从中获取当前的堆内存用量、堆内存最大值等。

- maxMemory:从上述 Bean 中获取的堆内存最大值,用来作为一个常量。

- garbageCollectorMXBeans:JVM 自带的描述 GC 统计信息的 MXBean 对象列表(不止一个)。

- resultIdGenerator:为每次检测的结果生成一个自增的 ID。因为可能并发访问,这里需要使用 AtomicLong 对象。

- monitorResult:描述单次的检测结果。MonitorResult 对象包含了当前时间戳、ID、堆内存用量、GC 时间等关键信息。当然,目前这些指标还是太少,生产环境还是需要更多决策项。

- lastGcTime:上次 GC 的时间值

- lastGcCount:上次 GC 的统计数

- shutdownHook:Flink 里常见的用法,注册一个 JVM 的 shutdown hook,这样在进程关闭时,可以打印相关的日志,并设置 isShutdown 环境变量。

- checkExecutor:创建一个周期执行器,并设置相关的取消策略。

- checkFuture:向上述周期执行器提交一个周期为 checkIntervalInMs,会定时运行本类的 runCheck()方法,来检查堆内存的情况,并生成对应的 monitorResult。

runCheck 方法

这个方法是周期检查的核心,也非常简单,我们来看一下:

private void runCheck() {
    long timestamp = System.currentTimeMillis();
    long id = resultIdGenerator.getAndIncrement();
    this.monitorResult = new MonitorResult(timestamp, id, memoryMXBean.getHeapMemoryUsage(), getGarbageCollectionTime());
    if (LOG.isDebugEnabled()) {
        LOG.debug("Check memory status, {}", monitorResult.toString());
    }
}

可以看到,这个方法就是定期将当前的堆内存用量,以及最近一次 GC 的平均时间保存在本实例的 monitorResult 对象中,以备决策者读取。堆内存用量的获取非常直白,即直接获取 memoryMXBean 的 getHeapMemoryUsage() 方法即可,但是 GC 时间的获取还是要费一番功夫。

getGarbageCollectionTime 方法

这个方法用来获取最近 GC 的平均时间,代码如下:

private long getGarbageCollectionTime() {
    long count = 0;
    long timeMillis = 0;
    for (GarbageCollectorMXBean gcBean : garbageCollectorMXBeans) {
        long c = gcBean.getCollectionCount();
        long t = gcBean.getCollectionTime();
        count += c;
        timeMillis += t;
    }

    if (count == lastGcCount) {
        return 0;
    }

    long gcCountIncrement = count - lastGcCount;
    long averageGcTime = (timeMillis - lastGcTime) / gcCountIncrement;

    lastGcCount = count;
    lastGcTime = timeMillis;

    return averageGcTime;
}

从代码里可以看到,这里是通过一个 for 循环,遍历 garbageCollectorMXBeans 列表里的所有 GC 的 MXBean,然后逐个读取当前的 GC 次数和时间,加到变量里。然后将得到的总次数和总时间,分别减去上次记录的值(lastGcCount 和 lastGcTime),然后进行相除,就可以得到本次检测时的 GC 平均时间了。

总结

在 Spillable StateBackend 的设计中,HeapStatusMonitor 属于非常简单但是也非常核心的功能类,相当于整个系统决策的信号标。

但时,我们也注意到,目前它的作用只是定时的检测堆内存用量、平均 GC 时间,并没有涉及到更多的指标,例如方差、增量比率、复合指标等等。同时,由于 GC 算法的不同,这里得到的平均 GC 时间很可能没有意义,例如最新的 ZGC 宣称可以将停顿时间降低到 2ms 以下。此时,单纯检测 GC 时间,并不能很好地得出系统繁忙与否的结论。

因此,目前这个预览版仍然不适合作为线上生产环境使用。我们近期会持续追踪,并补充生产环境的一些实践经验和改进项,这些都会在接下来的系列文章中得到阐述。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 如何应对飞速增长的状态?Flink State TTL 概述

    在流计算作业中,经常会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句...

    KyleMeow
  • 主用户为子用户添加流计算 Oceanus 授权策略指南

    流计算 Oceanus 的云 API 已经启用 CAM 服务级鉴权。默认情况下,所有主账号(也成为主用户)均有 QcloudOceanusFullAccess ...

    KyleMeow
  • Spill-able Heap Keyed State Backend 设计概览

    Flink 在流式数据处理方面的能力非常强大,尤其值得一提的是它对带状态的流计算作业的支持度。它支持 Operator 和 Keyed 两类状态存储结构,其中后...

    KyleMeow
  • 第六篇 : Epsilon 垃圾收集器

    Epsilon(A No-Op Garbage Collector)垃圾回收器控制内存分配,但是不执行任何垃圾回收工作。一旦java的堆被耗尽,jvm就直接关闭...

    程序员果果
  • 跟面试官聊.NET垃圾收集,直刺面试官G点

    装逼的面试官和装逼的程序员 我面试别人的时候,经常是按这种路子来面试: 看简历和面试题,从简历和面试题上找到一些技术点,然后跟应聘者聊。 聊某个技术点的时候,应...

    liulun
  • 读书笔记 dotnet 什么时候进行垃圾回收

    是否有小伙伴好奇如果没有在代码调用垃圾回收,那么框架会在什么时候调用垃圾回收。本文是读还没出版的伟民哥翻译的 .NET内存管理宝典 - 提高代码质量、性能和可扩...

    林德熙
  • Salesforce LWC学习(三) import & export / api & track

    我们使用vs code创建lwc 时,文件会默认生成包含 template作为头的html文件,包含了 import LightningElement的 js...

    用户1169343
  • JVM G1(Garbage-First Garbage Collector)收集器全过程剖析

    G1垃圾收集器的设计原则是“首先收集尽可能多的垃圾(Garbage First)”,目标是为了尽量缩短处理超大堆(超过4GB)产生的停顿。

    斯武丶风晴
  • Springboot之AbstractApplicationEventMulticaster

      Springboot的版本2.0.9.release,对应的SpringFramework值5.0.x.release。

    克虏伯
  • 简易数据分析 06 | 如何导入别人已经写好的 Web Scraper 爬虫

    上两期我们学习了如何通过 Web Scraper 批量抓取豆瓣电影 TOP250 的数据,内容都太干了,今天我们说些轻松的,讲讲 Web Scraper 如何导...

    卤代烃

扫码关注云+社区

领取腾讯云代金券