首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >ES 数据节点的 shard 分配问题

ES 数据节点的 shard 分配问题

原创
作者头像
陆攀
发布2025-07-19 18:13:49
发布2025-07-19 18:13:49
1760
举报

现象

正如我在 ES shard allocation bug 中提到的一样,当单个 ES 数据节点的 data.path 中的目录数超过 20 个时,会导致同一个索引分配到该节点的多个 shard 全部存储在某个目录下,造成磁盘 io 的倾斜,写入性能产生瓶颈。该问题存在于 ES 的各个版本中。本文针对 7.17 版本的 ES 代码进行分析

源码分析

ES 数据节点上 shard 对应的 path 分配算法源码如下:

代码语言:java
复制
public static ShardPath selectNewPathForShard(
    NodeEnvironment env,
    ShardId shardId,
    IndexSettings indexSettings,
    long avgShardSizeInBytes,
    Map<Path, Integer> dataPathToShardCount
) throws IOException {
    final Path dataPath;
    final Path statePath;
    if (indexSettings.hasCustomDataPath()) {
        dataPath = env.resolveCustomLocation(indexSettings.customDataPath(), shardId);
        statePath = env.dataPaths()[0].resolve(shardId);
    } else {
        // 计算所有 path 对应的剩余可用空间总和。
        BigInteger totFreeSpace = BigInteger.ZERO;
        for (NodeEnvironment.DataPath nodeDataPath : env.dataPaths()) {
            totFreeSpace = totFreeSpace.add(BigInteger.valueOf(nodeDataPath.fileStore.getUsableSpace()));
        }

        // 预估当前 shard 大小 = max(总可用磁盘空间/20,当前集群平均 shard 大小)
        BigInteger estShardSizeInBytes = BigInteger.valueOf(avgShardSizeInBytes).max(totFreeSpace.divide(BigInteger.valueOf(20)));

        final NodeEnvironment.DataPath[] paths = env.dataPaths();

        // 取剩余磁盘空间最大的 path 作为初始路径
        NodeEnvironment.DataPath bestPath = getPathWithMostFreeSpace(env);

        if (paths.length != 1) {
            // 获取该索引在每个 path 下已分配的 shard 数的对应关系
            Map<NodeEnvironment.DataPath, Long> pathToShardCount = env.shardCountPerPath(shardId.getIndex());

            // Compute how much space there is on each path
            final Map<NodeEnvironment.DataPath, BigInteger> pathsToSpace = new HashMap<>(paths.length);
            // 重新计算各 path 对应的可用空间
            for (NodeEnvironment.DataPath nodePath : paths) {
                FileStore fileStore = nodePath.fileStore;
                BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace());
                pathsToSpace.put(nodePath, usableBytes);
            }
            
            bestPath = Arrays.stream(paths)
                // 注意:这里很重要,过滤去剩余磁盘空间 > estShardSizeInBytes 对应的 path。
                // 由于 estShardSizeInBytes = 总的可用磁盘空间 * %5
                .filter((path) -> pathsToSpace.get(path).subtract(estShardSizeInBytes).compareTo(BigInteger.ZERO) > 0)
                // 将上一步筛选出来的 path 再根据已分配的当前索引 shard 个数按升序排序 
                .sorted((p1, p2) -> {
                    int cmp = Long.compare(pathToShardCount.getOrDefault(p1, 0L), pathToShardCount.getOrDefault(p2, 0L));
                    if (cmp == 0) {
                        cmp = Integer.compare(
                            dataPathToShardCount.getOrDefault(p1.path, 0),
                            dataPathToShardCount.getOrDefault(p2.path, 0)
                        );
                        if (cmp == 0) {
                            cmp = pathsToSpace.get(p2).compareTo(pathsToSpace.get(p1));
                        }
                    }
                    return cmp;
                })
                // 取 排序完成后的 path 列表中的第一个。这里相当于将同一个索引的shard,按 path 均匀打散
                .findFirst()
                // 如果没有,则直接使用初始的 bestPath
                .orElse(bestPath);
        }
        statePath = bestPath.resolve(shardId);
        dataPath = statePath;
    }
    return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
}

从上面的源码可知,es shard 在 data node 节点的 path 分配算法如下:

  1. 计算所有 path 对应的磁盘剩余可用空间总和。命名为:totFreeSpace。
代码语言:java
复制
BigInteger totFreeSpace = BigInteger.ZERO;
        for (NodeEnvironment.DataPath nodeDataPath : env.dataPaths()) {
            totFreeSpace = totFreeSpace.add(BigInteger.valueOf(nodeDataPath.fileStore.getUsableSpace()));
        }
  1. 在集群中平均shard大小(avgShardSizeInBytes)和当前节点剩余磁盘空间的5%(这个比例是写死的,直接用总的磁盘空间 / 20 得出) 的之间取最大值,来预估当前shard 的大小。命名为:estShardSizeInBytes。代码如下:
代码语言:java
复制
BigInteger estShardSizeInBytes = BigInteger.valueOf(avgShardSizeInBytes).max(totFreeSpace.divide(BigInteger.valueOf(20)));
  1. 默认取该节点最大的剩余空间对应的 path 为 bestPath。代码如下:
代码语言:java
复制
NodeEnvironment.DataPath bestPath = getPathWithMostFreeSpace(env);
  1. 计算每个 path 已经分配了该索引对应的 shard 个数(用于单个索引在不同 path 上负载均衡使用)。命名为:pathToShardCount。代码如下:
代码语言:java
复制
Map<NodeEnvironment.DataPath, Long> pathToShardCount = env.shardCountPerPath(shardId.getIndex());
  1. 计算每个 path,对应的剩余可用存储空间。命名为: pathsToSpace。代码如下:
代码语言:java
复制
final Map<NodeEnvironment.DataPath, BigInteger> pathsToSpace = new HashMap<>(paths.length);
for (NodeEnvironment.DataPath nodePath : paths) {
    FileStore fileStore = nodePath.fileStore;
    BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace());
    pathsToSpace.put(nodePath, usableBytes);
}
  1. 过滤中剩余可用磁盘空间 > estShardSizeInBytes (第2步中计算得出)的 path。
    1. 如果存在: 1.1. 则将筛选出来的 path 列表再根据已分配的当前索引 shard 个数按升序排序(即没有分配到当前索引对应的shard的path,放在最前面) 1.2. 取排序后的第一个
    2. 如果不存在,则直接使用步骤3中,初始化的 bestPath
代码语言:java
复制
bestPath = Arrays.stream(paths)
                // 注意:这里很重要,过滤去剩余磁盘空间 > estShardSizeInBytes 对应的 path。
                // 由于 estShardSizeInBytes = 总的可用磁盘空间 * %5
                .filter((path) -> pathsToSpace.get(path).subtract(estShardSizeInBytes).compareTo(BigInteger.ZERO) > 0)
                // 将上一步筛选出来的 path 再根据已分配的当前索引 shard 个数按升序排序 
                .sorted((p1, p2) -> {
                    int cmp = Long.compare(pathToShardCount.getOrDefault(p1, 0L), pathToShardCount.getOrDefault(p2, 0L));
                    if (cmp == 0) {
                        // if the number of shards is equal, tie-break with the number of total shards
                        cmp = Integer.compare(
                            dataPathToShardCount.getOrDefault(p1.path, 0),
                            dataPathToShardCount.getOrDefault(p2.path, 0)
                        );
                        if (cmp == 0) {
                            // if the number of shards is equal, tie-break with the usable bytes
                            cmp = pathsToSpace.get(p2).compareTo(pathsToSpace.get(p1));
                        }
                    }
                    return cmp;
                })
                // 取 排序完成后的 path 列表中的第一个。这里相当于将同一个索引的shard,按 path 均匀打散
                .findFirst()
                // 如果没有,则直接使用初始的 bestPath
                .orElse(bestPath);

总结

  1. 由于 estShardSizeInBytes = totFreeSpace / 20,totFreeSpace = es 配置磁盘个数 * 单盘可用空间。当 es 配置的磁盘空间大于 20 个时,很容易出现 estShardSizeInBytes > 单盘可用空间 的情况。从而最终导致shard分配倾斜。
  2. 当 ES 数据节点的磁盘个数超过 20 个时,可以考虑部署多个 ES 实例来平分整体的磁盘资源。将单个实例的磁盘个数降到 20 个以内。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 现象
  • 源码分析
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档