前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink RocksDB托管内存机制的幕后—Cache & Write Buffer Manager

Flink RocksDB托管内存机制的幕后—Cache & Write Buffer Manager

作者头像
大数据真好玩
发布2022-06-17 13:59:25
1.2K0
发布2022-06-17 13:59:25
举报
文章被收录于专栏:暴走大数据暴走大数据

前言

为了解决Flink作业使用RocksDB状态后端时的内存超用问题,Flink早在1.10版本就实现了RocksDB的托管内存(managed memory)机制。用户只需启用state.backend.rocksdb.memory.managed参数(默认即为true),再设定合适的TaskManager托管内存比例taskmanager.memory.managed.fraction,即可满足多数情况的需要。

关于RocksDB使用托管内存,Flink官方文档给出了一段简短的解释:

Flink does not directly manage RocksDB’s native memory allocations, but configures RocksDB in a certain way to ensure it uses exactly as much memory as Flink has for its managed memory budget. This is done on a per-slot level (managed memory is accounted per slot). To set the total memory usage of RocksDB instance(s), Flink leverages a shared cache and write buffer manager among all instances in a single slot. The shared cache will place an upper limit on the three components that use the majority of memory in RocksDB: block cache, index and bloom filters, and MemTables.

本文先简单介绍一下RocksDB(版本5.17.2)内部的Cache和Write Buffer Manager这两个组件,然后看一眼Flink是如何借助它们来实现RocksDB内存托管的。

[LRU]Cache

Cache组件负责管理Block Cache,在RocksDB中的实现有两种,分别对应两种常用的缓存置换算法:LRUCache和ClockCache。由于ClockCache目前仍有bug,所以在生产环境总是使用默认的LRUCache。注意Cache有压缩的和非压缩的两种,这里只考虑默认的非压缩Cache。

LRUCache最核心的四个参数列举如下:

  • capacity:缓存的总大小。
  • num_shard_bits:按2num_shard_bits的规则确定整个缓存区域的分片(CacheShard)总数,也就是分片编号的比特数。每个CacheShard均分缓存容量,读写时,会根据key哈希值的高num_shard_bits位来确定路由。
  • strict_capacity_limit:是否严格控制单个缓存分片的容量限制,默认为false。RocksDB的Iterator在遍历数据时,会将它要读取的一部分块暂时固定在Cache内,称为Iterator-pinned blocks。如果Iterator-pinned blocks的大小超过了分片容量,再插入数据就有造成OOM的风险。开启这个参数后,超额的缓存写入就会直接失败。
  • high_pri_pool_ratio:高优先级缓存区域占整个Cache的比例。所谓高优先级缓存一般是指SST文件索引和布隆过滤器对应的块,通过cache_index_and_filter_blocks和cache_index_and_filter_blocks_with_high_priority参数控制。

每个缓存分片LRUCacheShard都有一套哈希表+循环双链表的结构。哈希表称为LRUHandleTable,是RocksDB自己实现的链地址法分桶,且每个分片上都有互斥锁,整体与JDK中的旧版ConcurrentHashMap非常相似。哈希桶的扩容和缩容也是按照2的幂次,并且会尽量保证扁平(即每个桶中尽量只有1个元素)。

一个低优先级指针(图中Low-Pri)用于指示低优先级区域与高优先级区域的边界。如果高优先级LRUHandle的量超过了high_pri_pool_ratio比例规定的量,就会将溢出的高优先级LRUHandle降格成低优先级。当然,淘汰LRUHandle时也是从低优先级区域开始淘汰。

LRUHandle是LRUCache的最小单元,其key是SST文件的ID加上块在SST内的偏移量,value则是缓存的块数据(代码中为void*类型),另外还有数据大小、指针域和引用计数域等。为什么要有引用计数呢?因为RocksDB的实现方法与传统结构略有不同,链表中保存的并不是全部LRUHandle,而是可以被淘汰的那些LRUHandle,“可以被淘汰”的标准就是LRUHandle的引用计数为1——只有哈希表中存在,而没有外部引用者。也就是说,如果LRUHandle在链表中,那么一定在哈希表中,反之则不成立。

Write Buffer Manager(WBM)

顾名思义,Write Buffer Manager(以下简称WBM)是用来管理写缓存的组件。除了负责MemTable分配、Flush等细节,我们所关注的另一个作用则是追踪和控制MemTable的内存用量,它可以以两种形式生效:

  • 传入一个设定的阈值,WBM将多个列族或RocksDB实例的MemTable总大小限制在阈值内;
  • 将WBM传给Cache,可以使两者共同控制RocksDB总内存占用量的上限。

Flink也正是利用了上述特性来实现RocksDB托管内存的。那么WBM与Cache如何协同工作?如下图所示。

RocksDB Wiki中用了一句不符合英语语法的话来描述,即"Cost memory used in memtable to block cache",此时Block Cache的内存配额就是RocksDB全部的内存配额。

MemTable的分配单元称为Arena Block,默认大小为8MB。每分配一个Arena Block,WBM就会将它的内存消耗向LRUCache记账——所谓“记账”就是向Cache的低优先级区域内写入Dummy LRUHandle。这些LRUHandle没有value,只有key,但携带有Arena Block的内存消耗,且每个Dummy LRUHandle代表1MB的空间。也就是说它们仅占用了逻辑配额,并未占用物理空间,并且同样受Cache的LRU规则的控制。由于MemTable本身既是读缓存也是写缓存,所以把它和Block Cache统一起来倒也合理。

WBM控制下的MemTable Flush策略也变得更加激进了一些:

  • 当可变MemTable的大小超过WBM可用内存配额的7 / 8时,会触发Flush;
  • 当所有MemTable的大小超过内存配额,且可变MemTable的大小超过配额的一半时,也会触发Flush。

下面来简单看看Flink是如何利用WBM和Cache的。

To RocksDB Backend

直接上源码,即org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils类。

代码语言:javascript
复制
public class RocksDBMemoryControllerUtils {
    public static RocksDBSharedResources allocateRocksDBSharedResources(
            long totalMemorySize, double writeBufferRatio, double highPriorityPoolRatio) {
        long calculatedCacheCapacity =
                RocksDBMemoryControllerUtils.calculateActualCacheCapacity(
                        totalMemorySize, writeBufferRatio);
        final Cache cache =
                RocksDBMemoryControllerUtils.createCache(
                        calculatedCacheCapacity, highPriorityPoolRatio);

        long writeBufferManagerCapacity =
                RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(
                        totalMemorySize, writeBufferRatio);
        final WriteBufferManager wbm =
                RocksDBMemoryControllerUtils.createWriteBufferManager(
                        writeBufferManagerCapacity, cache);

        return new RocksDBSharedResources(cache, wbm, writeBufferManagerCapacity);
    }

    @VisibleForTesting
    static long calculateActualCacheCapacity(long totalMemorySize, double writeBufferRatio) {
        return (long) ((3 - writeBufferRatio) * totalMemorySize / 3);
    }

    @VisibleForTesting
    static long calculateWriteBufferManagerCapacity(long totalMemorySize, double writeBufferRatio) {
        return (long) (2 * totalMemorySize * writeBufferRatio / 3);
    }

    @VisibleForTesting
    static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
        // TODO use strict capacity limit until FLINK-15532 resolved
        return new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
    }

    @VisibleForTesting
    static WriteBufferManager createWriteBufferManager(
            long writeBufferManagerCapacity, Cache cache) {
        return new WriteBufferManager(writeBufferManagerCapacity, cache);
    }

    static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) {
        long arenaBlockSize = writeBufferSize / 8;

        // Align up to 4k
        final long align = 4 * 1024;
        return ((arenaBlockSize + align - 1) / align) * align;
    }

    static long calculateRocksDBMutableLimit(long bufferSize) {
        return bufferSize * 7 / 8;
    }

    @VisibleForTesting
    static boolean validateArenaBlockSize(long arenaBlockSize, long mutableLimit) {
        return arenaBlockSize <= mutableLimit;
    }
}

其中的writeBufferRatio就是state.backend.rocksdb.write-buffer-ratio参数,表示MemTable占托管内存(即Block Cache)的比例,默认0.5。同理,highPriorityPoolRatio就是state.backend.memory.high-prio-pool-ratio参数,表示高优先级内存占托管内存的比例,默认0.1。

托管内存在TaskManager的Slot之间平均分配,每个Slot都会有一组Cache和WBM。需要特别注意,实际的Cache和WBM配额是:

代码语言:javascript
复制
cache_capacity =  (3 - write_buffer_ratio) * total_memory_size / 3
write_buffer_manager_capacity = 2 * total_memory_size * write_buffer_ratio / 3

也就是说,如果TM总的托管内存的大小是3GB,默认比例下的Block Cache大小其实是2.5GB,MemTable配额其实是1GB,都略偏小一些。这是因为FLINK-15532尚未解决,strict_capacity_limit在Flink的场景下暂时不能生效,所以要留出一部分缓冲。推算的依据就是上一节提到的MemTable Flush策略,具体的关系如下:

代码语言:javascript
复制
write_buffer_manager_memory = 1.5 * write_buffer_manager_capacity
write_buffer_manager_memory = total_memory_size * write_buffer_ratio
write_buffer_manager_memory + other_part = total_memory_size
write_buffer_manager_capacity + other_part = cache_capacity
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-05-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • [LRU]Cache
  • Write Buffer Manager(WBM)
  • To RocksDB Backend
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档