前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spillable StateBackend 之 SpillAndLoadManager 源码注解

Spillable StateBackend 之 SpillAndLoadManager 源码注解

原创
作者头像
KyleMeow
修改2021-09-29 20:46:36
8231
修改2021-09-29 20:46:36
举报

背景概要

前文中,我们介绍了 Spillable Backend 及其 HeapStatusMonitor 的工作原理和不足。今天我们来看一下 Spillable Backend 的另一个核心组件:SpillAndLoadManager。如果说 HeapStatusMonitor 是测量系统负载的信号灯,那么 SpillAndLoadManager 就是具体的决策者,根据信号来决定当前系统的行为(将状态从内存 Spill 到磁盘,还是从磁盘 Load 到内存)。

SpillAndLoadManager 接口

SpillAndLoadManager 接口的定义非常简单,只有一个 checkResource() 方法,表示检查当前的资源,然后决定下一步的行动,其代码如下:

public interface SpillAndLoadManager {

	/**
	 * Check resource to decide whether to spill or load state.
	 */
	void checkResource();
}

目前的代码里,只有 SpillAndLoadManagerImpl 这一个实现类,因此本文以这个类为基准,通过逐行注解关键方法的方式,介绍它的工作原理,并对当前的实现方式提出一些建议。

SpillAndLoadManagerImpl 实现类初探

这个类目前有接近 500 行代码,处于中低复杂度。在分析它的工作原理之前,我们来首先先看一下他的整体结构:

SpillAndLoadManagerImpl 类的整体结构
SpillAndLoadManagerImpl 类的整体结构

其中上图的 Action 是一个枚举类型,有 SPILL、LOAD、NONE 三种取值,分别表示三种动作(NONE 表示”无动作“这种动作),因此很容易猜到,它是本方法的决策信号,决定了相关状态的去向。

ActionResult 则是对 Action 及比例参数(spillOrLoadRatio)的封装类,作为行动的指南:例如传入了 SPILL Action 和 0.2 的 spillOrLoadRatio 作为 ActionResult, 表明需要把 20% 的冷状态写入磁盘。

既然它的父类只定义了 checkResource() 一个方法,那我们先从这个方法入手,逐步分析该类的运行原理(下面的代码片段,删减了一些日志代码)。

public void checkResource() {
    long currentTime = System.currentTimeMillis();  // 获取当前 Unix 时间戳

    // 如果距离上次资源检查的时间小于阈值, 就不做检查. 配置项为 state.backend.spillable.resource-check.interval, 默认值为 10s
    if (currentTime - lastResourceCheckTime < resourceCheckInterval) { return; }

    lastResourceCheckTime = currentTime;    // 重置上次资源检查时间为当前值
    // getMonitorResult 访问的正是之前介绍的 heapStatusMonitor 里保存的资源负载。由于变量定义为了 volatile, 不能缓存, 因而访问开销较大
    HeapStatusMonitor.MonitorResult monitorResult = heapStatusMonitor.getMonitorResult();

    // 如果和上次获取的 monitorResult 相同, 则不做处理, 直接返回
    if (lastMonitorResult != null && lastMonitorResult.getId() == monitorResult.getId()) { return; }
    lastMonitorResult = monitorResult;

    ActionResult checkResult = decideAction(monitorResult);   // 根据 monitorResult 的值, 决定下一步的动作是 SPILL, LOAD 还是 NONE
    if (checkResult.action == Action.NONE) { return; }  // 如果无需动作, 则直接返回

    // 如果走到这里, 说明需要进行 SPILL 或 LOAD 动作. 首先确保动作不能太频繁, 如果小于阈值则直接返回, 不做动作
    // triggerInterval 的配置项为 state.backend.spillable.trigger-interval, 默认值为 1 分钟
    if (monitorResult.getTimestamp() - lastTriggerTime < triggerInterval) { return; }

    if (checkResult.action == Action.SPILL) {
        doSpill(checkResult);   // 调用 doSpill 方法将状态从内存移动到磁盘
    } else {
        doLoad(checkResult);    // 调用 doLoad 方法将状态从磁盘载入回内存
    }

    // spill 和 load 比较耗时, 所有事项做完后再更新 lastTriggerTime 时间
    lastTriggerTime = System.currentTimeMillis();
}

上述代码经过逐行注释后,逻辑非常清晰易懂。我们下面着重以三个核心方法 decideActiondoSpilldoLoad及其附属方法为例,介绍它们的工作流程。

decideAction 方法

这个方法根据 HeapStatusMonitor 返回的资源观测结果,做出实际的决策。

但是从下面的源码注释可以看到,目前决策的依据非常单一且不可靠,例如 Spill 只判断 GC 时间(这里计算的是近几次的平均时间,经常无法应对突发情况),Load 只判断内存用量等,而且不可动态调整,局限性非常大,因此这里也是迫切需要改进的一个点。

ActionResult decideAction(HeapStatusMonitor.MonitorResult monitorResult) {
    long gcTime = monitorResult.getGarbageCollectionTime();     // 获取近期 GC 平均时间
    long usedMemory = monitorResult.getTotalUsedMemory();       // 获取堆内存总用量

    // 1. 检查是否需要触发 SPILL
    if (gcTime > gcTimeThreshold) { // TODO: 目前只有 GC 时间是否超过阈值, 非常简陋
        return ActionResult.ofSpill(spillSizeRatio);    // TODO: 这个 ratio 需要改为动态的, 目前为配置项 state.backend.spillable.spill-size.ratio 设置, 默认值为 0.2, 表示每次有 20% 的状态被移动到磁盘上
    }

    // 2. 检查是否需要触发 LOAD
    if (usedMemory < loadStartSize) {   // TODO: 目前只有一个指标, loadStartSize = maxMemory * loadStartRatio, 而 loadStartRatio 由 state.backend.spillable.load-start.ratio 配置项决定, 默认是 0.1
        float loadRatio = (float) (loadEndSize - usedMemory) / usedMemory;  // loadEndSize = maxMemory * loadEndRatio, 而 loadEndRatio 由 state.backend.spillable.load-end.ratio 配置项决定, 默认为 0.3
        return ActionResult.ofLoad(loadRatio);  // 决定载入的内存比例
    }

    // 都不需要, 那么无动作
    return ActionResult.ofNone();
}

StateMapMeta

这个方法主要描述了在状态表中的一个 KeyGroup 所保存的所有状态的元数据信息,称为一个 StateMap。 在 Flink 的状态系统中,分配的最小单元是 KeyGroup,表示哈希值一致的一组 Key。若干个连续的 KeyGroup 组成一个 KeyGroupRange,分配给相关算子的某个并行实例中。

public static class StateMapMeta {
    private final SpillableStateTable stateTable;    // KeyGroup 所属的状态表引用
    private final int keyGroupIndex;    // 该状态表中, 此 KeyGroup 的偏移量 (索引)
    private final boolean isOnHeap;     // 目前是否在堆内存里
    private final int size;             // 该 KeyGroup 的状态数
    private final long numRequests;     // 该 KeyGroup 的总请求数
    private long estimatedMemorySize;   // 估计的状态总大小, 如果是 -1 表示未初始化
}

可以看到,StateMapMeta 对象包括了这个 KeyGroup 所属对象的多种典型属性,Flink Spillable Backend 就是根据这些属性来计算权重、决定 Spill 还是 Load 等动作的影响范围。

getStateMapMetas 方法

这里我们简单介绍一下这个名为 getStateMapMetas 的辅助方法。它的作用是给定一个过滤器函数,对当前状态表里的所有 StateMap(即 KeyGroup 所属的状态集)进行筛选,然后返回符合条件的列表。

private List<SpillableStateTable.StateMapMeta> getStateMapMetas(
    Function<SpillableStateTable.StateMapMeta, Boolean> stateMapFilter) {   // 传入筛选策略函数, 按条件筛选状态表中符合条件的 KeyGroup 列表
    List<SpillableStateTable.StateMapMeta> stateMapMetas = new ArrayList<>();   // 创建一个列表, 以作为本函数返回值
    for (Tuple2<String, SpillableStateTable> tuple : stateTableContainer) {     // stateTableContainer 是即当前实例中, 所有已注册的状态名和状态表的 Tuple2 映射
        int len = stateMapMetas.size();
        SpillableStateTable spillableStateTable = tuple.f1;     // tuple.f1 是某个状态的状态表, 而 tuple.f0 是状态名, 这里用不到
        Iterator<SpillableStateTable.StateMapMeta> iterator = spillableStateTable.stateMapIterator();   // 准备按状态表中 KeyGroup 的顺序, 依次遍历检查该状态表里的所有的状态
        while (iterator.hasNext()) {    // 对该状态表中的每个 KeyGroup 下的状态映射进行遍历
            SpillableStateTable.StateMapMeta meta = iterator.next();
            if (stateMapFilter.apply(meta)) { stateMapMetas.add(meta); }    // 如果这个状态表的 KeyGroup 的 Meta 信息符合传入函数的筛选条件, 就加入返回列表
        }

        if (len < stateMapMetas.size()) {   // 如果发现上述循环中, 新增了符合条件的 KeyGroup, 需要估计表中每个 KeyGroup 的状态大小, 并写入元数据 (Meta) 中
            long estimatedSize = spillableStateTable.getStateEstimatedSize(true);   // 更新对象的平均大小, 注意是平均大小

            // 逐个更新每个新增的 KeyGroup 元数据中堆内存的估计大小, 计算公式是: 状态总数 * 估计的状态平均大小
            for (int i = len; i < stateMapMetas.size(); i++) {
                SpillableStateTable.StateMapMeta stateMapMeta = stateMapMetas.get(i);
                stateMapMeta.setEstimatedMemorySize(stateMapMeta.getSize() * estimatedSize);    // 状态总数 * 估计的状态平均大小
            }
        }
    }
    return stateMapMetas; // 返回含有最新状态估计大小的状态表元数据
}

特别需要注意的是,它的作用不仅仅在于筛选,而且还在筛选之后,对本 StateMap 里所有状态的大小进行估计,并保存在前面所述的 StateMapMeta 对象中。这样,后面的权重计算和排序,才有了数据支持。

doSpill 方法

下面我们来看一下,Spillable Backend 是如何将内存里的状态对象,Spill 到磁盘上的。这个 doSpill 方法由前述的 decideAction 方法调用,执行具体的 Spill 操作(优先选择访问不频繁、尺寸较大的 KeyGroup 进行 spill)。

它包括了筛选和大小估计、权重排序、执行 Spill 等多个步骤,最终达到阈值而完成整个流程。

void doSpill(ActionResult actionResult) {
    List<SpillableStateTable.StateMapMeta> onHeapStateMapMetas =    // 筛选当前 KeyGroup 并得到统计的元数据 (过滤条件是 onHeap 并且 size 大于 0)
        getStateMapMetas((meta) -> meta.isOnHeap() && meta.getSize() > 0);
    if (onHeapStateMapMetas.isEmpty()) { return; }  // 如果没有筛选到, 说明不用 spill, 直接返回

    sortStateMapMeta(actionResult.action, onHeapStateMapMetas);     // 根据 KeyGroup 的状态大小和访问频次进行权重排序, 权重大的放在前面

    long totalSize = onHeapStateMapMetas.stream()   // 获取所有堆内 KeyGroup 状态大小的总和
        .map(SpillableStateTable.StateMapMeta::getEstimatedMemorySize).reduce(0L, (a, b) -> a + b);     // 可以用 Long::sum 代替
    long spillSize = (long) (totalSize * actionResult.spillOrLoadRatio);    // 计算需要 spill 的比例

    if (spillSize == 0) { return; }
    if (cancelCheckpoint) { checkpointManager.cancelAllCheckpoints(); }     // 配置项 state.backend.spillable.cancel.checkpoint 可以控制是否在 spill 时取消当前的所有进行中的快照(默认为 true)。取消快照可以加快 GC 和 Spill 过程

    for (SpillableStateTable.StateMapMeta meta : onHeapStateMapMetas) {     // 根据排序得到的 KeyGroup 列表, 从权重最大(访问频次最小、大小最大)的开始, 逐个进行 Spill 操作, 直到达到阈值
        meta.getStateTable().spillState(meta.getKeyGroupIndex());
        spillSize -= meta.getEstimatedMemorySize();
        if (spillSize <= 0) { break; }  // 如果 Spill 的大小已经达到了阈值, 就不再继续, 本次 Spill 操作结束
    }
}

doLoad 方法

这个 doLoad 方法同样由 decideAction 方法调用,是 doSpill 方法的“对偶函数”,即将状态从磁盘等外存,载入到堆内存中(优先选择访问频繁、尺寸较小的 KeyGroup 进行 load)。

整体的函数逻辑与 doSpill 相同,只是更新阈值的代码放在了操作之前,以避免多载入对象到内存中,造成较大压力。

void doLoad(ActionResult actionResult) {    // 将状态从磁盘载入回堆内存
    List<SpillableStateTable.StateMapMeta> onDiskStateMapMetas =    // 筛选出所有不在堆内存且状态不为 0 的 KeyGroup 状态列表
        getStateMapMetas((meta) -> !meta.isOnHeap() && meta.getSize() > 0);
    if (onDiskStateMapMetas.isEmpty()) { return; }

    sortStateMapMeta(actionResult.action, onDiskStateMapMetas);     // 对所有的 KeyGroup 状态列表进行权重排序, 最大的 (访问次数最多、状态最小的)放在前面, 优先进行 Load

    long totalSize = onDiskStateMapMetas.stream()   // 计算符合条件的状态总大小
        .map(SpillableStateTable.StateMapMeta::getEstimatedMemorySize)
        .reduce(0L, Long::sum);
    long loadSize = (long) (totalSize * actionResult.spillOrLoadRatio); // 计算出需要载入的最大比例

    if (loadSize == 0) { return; }

    for (SpillableStateTable.StateMapMeta meta : onDiskStateMapMetas) { // 开始载入到内存, 直到满足阈值
        loadSize -= meta.getEstimatedMemorySize();
        // Load 时先减去状态大小, 避免多载入了一些状态, 导致内存压力比预期的更大
        if (loadSize < 0) { break; }

        meta.getStateTable().loadState(meta.getKeyGroupIndex());    // 按照 KeyGroup 元数据里面记录的 KeyGroupIndex 来载入状态到当前状态表
    }
}

sortStateMapMeta 方法

这个方法也属于比较关键的一个方法。它会根据传入的 StateMapMeta 列表,将其归一化以后,按照既定的规则进行权重排序。

需要注意的是,权重计算的方法在 computeWeight 方法中,根据 Spill(优先选择访问频率低、尺寸大的状态进行 Spill)还是 Load(优先选择访问频率高、尺寸小的状态进行 Load)有完全相反的权重计算方法。

private void sortStateMapMeta(Action action, List<SpillableStateTable.StateMapMeta> stateMapMetas) {
    if (stateMapMetas.isEmpty()) { return; }

    // 使用 (X - Xmin)/(Xmax - Xmin) 公式来进行归一化, 以确保归一化后的值域位于 [0,1]
    long sizeMax = 0L, sizeMin = Long.MAX_VALUE, requestMax = 0L, requestMin = Long.MAX_VALUE;
    for (SpillableStateTable.StateMapMeta meta : stateMapMetas) {   // 统计最大、最小的的 KeyGroup 状态对象总大小
        long estimatedMemorySize = meta.getEstimatedMemorySize();
        sizeMax = Math.max(sizeMax, estimatedMemorySize);
        sizeMin = Math.min(sizeMin, estimatedMemorySize);
        long numRequests = meta.getNumRequests();   // 获取该 KeyGroup 状态的总请求数
        requestMax = Math.max(requestMax, numRequests);
        requestMin = Math.min(requestMin, numRequests);
    }
    final long sizeDenominator = sizeMax - sizeMin; // 根据上述公式进行归一化
    final long requestDenominator = requestMax - requestMin;
    final long sizeMinForCompare = sizeMin;
    final long requestMinForCompare = requestMin;
    final Map<SpillableStateTable.StateMapMeta, Double> computedWeights = new IdentityHashMap<>();  // 准备一个 Map 来表示各个 KeyGroup 的相对权重
    Comparator<SpillableStateTable.StateMapMeta> comparator = (o1, o2) -> {     // 创建一个 KeyGroup 权重的 Comparator 用来排序
        if (o1 == o2) { return 0; }
        if (o1 == null) { return -1; }
        if (o2 == null) { return 1; }
        double weight1 = computedWeights.computeIfAbsent(o1,    // 计算第一个 KeyGroup 状态的权重, 并放入 computedWeights Map 中准备排序
            k -> computeWeight(k, action, sizeMinForCompare, requestMinForCompare, sizeDenominator, requestDenominator));   // computeWeight 的公式是 (weightRetainedSize * normalizedSize + weightRequestRate * normalizedRequest) / weightSum
        double weight2 = computedWeights.computeIfAbsent(o2,    // 计算第二个 KeyGroup 状态的权重, 并放入 computedWeights Map 中准备排序
            k -> computeWeight(k, action, sizeMinForCompare, requestMinForCompare, sizeDenominator, requestDenominator));
        // 对比权重, 然后对大的一方优先返回 -1, 这样在排序时会排到最前面, 优先进行 SPILL 或者 LOAD 等动作
        return (weight1 > weight2) ? -1 : 1;
    };
    stateMapMetas.sort(comparator); // 进行排序, 令大的 KeyGroup 可以出现在最前面
}

这个方法利用了 Java 自带的排序机制,通过自定义 Comparator 的方式,让权重最大的对象排在前面,这样构造了一个优先队列,前面介绍的doSpilldoLoad 方法都只需要从首部开始处理即可。

computeWeight 方法

这个 computeWeight 方法决定了排序时的先后顺序。可以看到,对于 SPILL 和 LOAD,计算的公式可以说是相反的,但是最终目的一致:根据概率原理,留在内存里的相对都是访问频繁及占空间较小的对象,有利于保持性能。

private double computeWeight(
    SpillableStateTable.StateMapMeta meta,
    Action action,
    long sizeMin, long requestMin,
    long sizeDenominator, long requestDenominator) {
    double normalizedSize = sizeDenominator == 0L ? 0.0 : (meta.getEstimatedMemorySize() - sizeMin) / (double) sizeDenominator; // 对本 KeyGroup 的状态大小进行归一化
    double normalizedRequest =  // 对本 KeyGroup 里状态的请求次数进行归一化
        requestDenominator == 0L ? 0.0 : (meta.getNumRequests() - requestMin) / (double) requestDenominator;
    double weightRetainedSize, weightRequestRate, weightSum;
    switch (action) {
        case SPILL: // 如果是 SPILL 操作, 倾向于选择访问不频繁的、较大的 Bucket 来进行
            weightRetainedSize = WEIGHT_SPILL_RETAINED_SIZE;    // 固定为 0.7, 正权重
            weightRequestRate = WEIGHT_SPILL_REQUEST_RATE;      // 固定为 -0.3, 负权重
            weightSum = WEIGHT_SPILL_SUM;   // 固定为前两者之和, 即 0.4
            break;
        case LOAD:  // 如果是 LOAD 操作, 倾向于选择请求频繁的、较小的 Bucket 来进行
            weightRetainedSize = WEIGHT_LOAD_RETAINED_SIZE; // 固定为 -0.3, 正权重
            weightRequestRate = WEIGHT_LOAD_REQUEST_RATE;   // 固定为 0.7, 负权重
            weightSum = WEIGHT_LOAD_SUM;    // 固定为前两者之和, 即 0.4
            break;
        default:
            throw new RuntimeException("Unsupported action: " + action);
    }
    // 如果是 Spill, 公式目前为 (0.7*normalizedSize-0.3*normalizedRequest)/0.4; 如果是 Load, 公式目前是 (0.7*normalizedRequest-0.3*normalizedSize)/0.4
    return (weightRetainedSize * normalizedSize + weightRequestRate * normalizedRequest) / weightSum; 
}

目前来看,公式里采用的参数都是固定的,对于调优来说并不是很灵活。

总结和展望

从目前的代码来看,SpillAndLoadManagerImpl 类的实现只能说完成了雏形,但是还有很多硬伤,例如公式参数固定不可调、决策条件过于简单、资源检测手段较为单一等等。

但需要看到,正是有了这些基石,该功能才有进一步完善的可能。因此这里非常感谢 Flink 社区的 Pengfei Li 和 Yu Li 两位作者的辛勤劳动。

我们也期待在接下来的版本中,Flink 的 Spillable Backend 可以更加成熟和完善;同时,后续我们也会将线上的一些经验和结论,融入到资源检测和动作判断逻辑中,让这个模块更加成熟、稳定,真正在生产环境下可用、可靠,最大程度上替代现有的 HeapKeyedStateBackend 和 RocksDBKeyedStateBackend。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景概要
  • SpillAndLoadManager 接口
  • SpillAndLoadManagerImpl 实现类初探
  • decideAction 方法
  • StateMapMeta
  • getStateMapMetas 方法
  • doSpill 方法
  • doLoad 方法
  • sortStateMapMeta 方法
  • computeWeight 方法
  • 总结和展望
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档