Elasticsearch底层系列之Shard Allocation机制

背景

    Elasticsearch由一些Elasticsearch进程(Node)组成集群,用来存放索引(Index)。为了存放数据量很大的索引,Elasticsearch将Index切分成多个分片(Shard),在这些Shard里存放一个个的文档(document)。通过这一批shard组成一个完整的index。并且,每个Shard可以设置一定数量的副本(Replica),写入的文档同步给副本Shard,副本Shard可以提供查询功能,分摊系统的读负载。在主Shard所在Node(ES进程)挂掉后,可以提升一个副本Shard为主Shard,文档继续写在新的主Shard上,来提升系统的容灾能力。

    既然Shard和Replica有这样的好处,那么Elasticsearch是如何利用和管理这些Shard,让Shard在集群Node上合理的分配,比如,使副本Shard不和主Shard分配在一个Node上,避免容灾失效等。尽量把Shard分配给负载较轻的Node来均摊集群的压力,随着Shard分配,久而久之Shard在集群中会出现分配不均衡的情况,这又该如何才能做到均衡。这便是我们这次讨论的主题:Elasticsearch的分片分配和均衡机制。

触发条件

    先看下在什么场景下会触发Shard的Allocation:

  1. 创建/删除一个Index;
  2. 加入/离开一个Node;
  3. 手动执行了Reroute命令;
  4. 修改了Replica设置;

    当触发了Shard的Allocation,Allocation是如何决定将分片分配给哪个Node,Allocation的过程又是怎样的呢?

Decider

Elasticsearch内有个一个AllocationDecider模块,定义了四种策略决定的结果:

public static final Decision ALWAYS = new Single(Type.YES);
public static final Decision YES = new Single(Type.YES);
public static final Decision NO = new Single(Type.NO);
public static final Decision THROTTLE = new Single(Type.THROTTLE);

从字面上便可以看出策略结果的含义 每种策略是一个单独的实现,重写了如下策略方法:

/**
 * 当前shard routing是否允许rebalance
* 默认是ALWAYS始终允许的
 */
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
    return Decision.ALWAYS;
}

/**
 * 当前shard routing是否允许分配到目标Node
* 默认是ALWAYS始终允许的
*/
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
    return Decision.ALWAYS;
}

/**
 * 在rebalance过程中,当前Shard是否允许留在当前Node
* 默认是ALWAYS始终允许的
*/
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
    return Decision.ALWAYS;
}

AllocationDecider策略实现有以下14种:

AllocationDecider策略

AllocationDecider策略

    这些策略实现继承自”AllocationDecider”类,如果不覆盖方法的话,默认是ALWAYS允许的。

    我们依次看看这些策略的作用:

MaxRetryAllocationDecider:     定义了Shard维度的Allocation策略,防止Shard在失败次数达到上限后继续分配,当Shard分配失败一次后,失败次数会加1,当Shard分配次数超过配置的最大次数时,这个策略生效,返回Decision.NO;可以通过配置”index.allocation.max_retries”,来设置分配的最大失败重试次数,默认是5次,当然系统分配到达重试次数后,可以手动分配分片,在URL后带上“?retry_failed”请求参数,可以尝试再次分配分片。

ReplicaAfterPrimaryActiveAllocationDecider:      定义了Shard维度的Allocation策略,在分配副本分片时,检查主分片的状态,防止主分片不是Active情况下分配副本分片。

RebalanceOnlyWhenActiveAllocationDecider     定义了Rebalance策略,检查所有的主分片副本分片均是Active状态,才允许Rebalance操作。

ClusterRebalanceAllocationDecider     定义了Rebalance策略,检查系统动态配置”cluster.routing.allocation.allow_rebalance”,可以配置这些选项:

  • always - 不管如何都允许Rebalance.
  • indices_primaries_active - 集群内所有主分片都已经分配后,允许Rebalance,也就是在集群是red状态不允许Rebalance.
  • indices_all_active - (default) 所有的分片一分配才允许Rebalance,此时集群状态要是green状才行
  • 默认配置是所有分片均已分配,也就是集群是green状态的才允许Rebalance操作。

ConcurrentRebalanceAllocationDecider     定义了Rebalance策略,检查系统动态配置”cluster.routing.allocation.cluster_concurrent_rebalance”,表示集群同时允许进行rebalance操作的并发数量,默认是2。通过检查RoutingNodes类中维护的relocatingShards计数器,看是否超过系统配置的并发数,超过则不允许执行Rebalance操作。

EnableAllocationDecider     定义了Allocate策略和Rebalance策略,策略会读取系统动态配置,配置分cluster级别和index级别,如果都配置了,index级别会覆盖cluster级别。 Allocate策略会读取cluster级别”cluster.routing.allocation.enable”配置,默认为all。

  • all - (默认) 所有类型均允许allocation
  • primaries - 只允许allocation主分片.
  • new_primaries - 只允许allocation 新创建index的主分片.
  • none - 所有的分片都不允许allocation

    如果当前index配置了“index.routing.allocation.enable”配置,将覆盖cluster级别配置,内容和上面的一样,也分四种类型     Rebalance也会读取cluster级别“cluster.routing.rebalance.enable”配置,默认为all。

  • all - (默认) 所有类型均允许rebalance.
  • primaries - 只允许rebalance主分片.
  • replicas - 只允许rebalance 副本分片.
  • none - 所有的分片都不允许rebalance.

    index配置是”index.routing.rebalance.enable”,内容和上面的一样,也各分四种类型,含义一样。index级别配置后,会覆盖cluster级别配置。

NodeVersionAllocationDecider     定义了Allocate策略,检查分片所在Node的版本是否高于目标Node的ES版本,如果高于,不允许allocation,这种策略的目的是避免目标Node无法适配高版本lucencn格式的文件,一般集群ES都是一致的,当集群在进行ES版本滚动升级时,会出现版本不一致的情况。

SnapshotInProgressAllocationDecider     定义了Allocate策略,根据系统动态配置”cluster.routing.allocation.snapshot.relocation_enabled”,决定snapshot期间是否允许allocation,由于snapshot只发生在主分片,所以只会限制主分片的allocation。

FilterAllocationDecider     定义了Allocate策略,明确指定是否允许分片分配到指定Node上,分为index级别和cluster级别

  • index.routing.allocation.require.{attribute}
  • index.routing.allocation.include{attribute}
  • index.routing.allocation.exclude.{attribute}
  • cluster.routing.allocation.require.{attribute}
  • cluster.routing.allocation.include.{attribute}
  • cluster.routing.allocation.exclude.{attribute}

    require表示必须分配到指定node,include表示可以分配到指定node,exclude表示不允许分配到指定Node,cluster的配置会覆盖index级别的配置,比如index include某个node,cluster exclude某个node,最后的结果是exclude某个node,上面{attribute}表示node的匹配方式有:

  • _name 匹配node名称,多个node名称用逗号隔开
  • _ip 匹配node ip,多个ip用逗号隔开
  • _host 匹配node的host name 多个host name用逗号隔开

例如:

PUT _cluster/settings
{
  "transient" : {
    "cluster.routing.allocation.exclude._ip" : “10.0.0.1,10.0.0.2"
  }
}

SameShardAllocationDecider     定义了Allocate策略,避免将shard的不同类型(主shard,副本shard)分配到同一个node上,先检查已分配shard的NodeId是否和目标Node相同,相同肯定是不能分配。除了检查NodeId,为了避免分配到同一台机器的不同Node,会检查已分配shard的Node ip和hostname是否和目标Node相同,相同的话也是不允许分配的。

DiskThresholdDecider     定义了Allocate策略,Remind策略。策略根据Node的磁盘剩余量来决定是否分配到该Node,以及检查Shard是否可以继续停留在当前Node上,会检查系统的动态配置”cluster.routing.allocation.disk.threshold_enabled”默认“true”,如果为false,该策略允许分配分片。     策略里还会用到另外两项系统动态配置:

  1. “cluster.routing.allocation.disk.watermark.low”,默认值“85%”,达到这个值后,新索引的分片不会分配到这个Node上,也可以设置具体的byte数大小;
  2. “cluster.routing.allocation.disk.watermark.high”,默认值“90%”,达到这个值后,会触发已分配到该节点的Shardrebalance到其他Node上,配置项可以设置成具体的byte数大小。

ThrottlingAllocationDecider     定义了Allocate策略,避免过多的Recoving Allocation,结合系统的动态配置,避免过多的Recoving任务导致该Node的负载过高,相关配置有:     ”cluster.routing.allocation.node_initial_primaries_recoveries”,当前Node在进行主分片恢复的数量,默认为四个,ES内部是通过统计主Shard是否处于初始化状态,并且不是出于从其他节点reloacting过来,另外,”cluster.routing.allocation.node_concurrent_incoming_recoveries”,默认是2,通常是其他Node上的副本shard恢复到该Node上,以及”cluster.routing.allocation.node_concurrent_outgoing_recoveries”,默认为2,通常是当前节点上的主节点恢复副本Shard到其他Node上 “cluster.routing.allocation.node_concurrent_recoveries”,用来直接配置上面incoming和outgoing两个值的和。

ShardsLimitAllocationDecider     定义了Allocate策略,根据系统的动态配置,index级别的”index.routing.allocation.total_shards_per_node”,表示这个index每个node的总共允许存在多少个shard,默认值是-1表示无穷多个;和cluster级别”cluster.routing.allocation.total_shards_per_node”,表示集群范围内每个Node允许存在有多少个shard。默认值是-1表示无穷多个。如果目标Node的Shard数超过了配置的上限,则不允许分配Shard到该Node上。注意:index级别的配置会覆盖cluster级别的配置。

AwarenessAllocationDecider     定义了Shard Allocation和Remind策略,类似机架感知。为了将主shard和副本shard跨机架/地区分配。通过设置系统动态配置”cluster.routing.allocation.awareness.attributes:rack_id”,这里配置的感知类型为rack_id,相应的在Node配置上增加node.attr.rack_id:rack_one后,随后创建的index的主分片与副本分片会跨rack_id分配,避免机架网络设备故障导致整个集群不可用。相应的”cluster.routing.allocation.awareness.force.zone.values”会强制跨机架分配副本shard,如果分配完主分配,无可用其他机架分配副本分片,则副本分片不允许分配。

所有的Allocation由上面14个策略组成,通过全部的策略该Node才是一个符合策略条件的目标Node,允许进行后面的分片分配过程。

     Shard Allocation,Shard Move,Shard Rebalance会利用这些Decision,再决定是否进行分片分配,分片迁移,分片均衡等操作;下面我们看看完整的Allocation过程会经过哪些步骤。

Allocation过程

一次Allocation的执行过程如下:

Allocation执行过程

Allocation完整过程

首先看下 Allocation有哪些触发条件:

Allocation的触发条件

上图中Allocation的触发条件有以下几种:

序号

调用函数

说明

1

AllocationService.applyStartedShards

Shard启动状态修改

2

AllocationService.applyFailedShards

shard失效状态修改

3

AllocationService.deassociateDeadNodes

Node离开

4

AllocationService.reroute(AllocationCommands)

执行reloaction命令

5

TransportClusterUpdateSettingsAction.masterOperation

集群配置修改操作

6

MetaDataCreateIndexService.onlyCreateIndex

创建index请求

7

MetaDataDeleteIndexService.deleteIndices

删除索引操作

8

MetaDataIndexStateService.closeIndex

关闭index操作

9

MetaDataIndexStateService.openIndex

打开index操作

10

NodeJoinController.JoinTaskExecutor

通过zendiscovery发现的节点加入集群

11

GatewayService.GatewayRecoveryListener

通过GatewayRecovery恢复的Node加入集群

12

LocalAllocateDangledIndices.submitStateUpdateTask

恢复磁盘内存在而MateDate内不存在的index

13

RestoreService.restoreSnapshot

从Snapshot中恢复index

    额外说明一下,上表中的第3点,当节点离开后,在系统动态配置”index.unassigned.node_left.delayed_timeout”的超时时间过后,会触发”DelayedAllocationService.DelayedRerouteTask”,会延迟搬迁操作,这样设置是为了避免网络抖动导致节点短暂离开触发Shard搬迁。

    在分配分片时,会先后经过两个维度的验证:一个是Shard维度,一个是Node维度 其中Shard维度有两个Decider: MaxRetryAllocationDecider和ReplicaAfterPrimaryActiveAllocationDecider。接着挨个验证Node级别的分配策略,完成了未分配分片的分配步骤后,接下来会进行分片是否需要迁移的检查,也就是下面的:

Move Shard

    Move Shard过程会经过上面十四个策略实现的canRemain方法,判断当前Shard是否可以继续留在当前的Node上, 会经过:

  1. AwarenessAllocationDecider的canRemain方法,判断是否满足awareness配置的感知参数
  2. DiskThresholdDecider的canRemain方法,判断当前Node是否超过高水位线
  3. FilterAllocationDecider的canRemain方法,判断当前Node是否符合过滤策略
  4. ShardsLimitAllocationDecider的方法,判断当前Node是否满足index维度和cluster维度的限制条件

    只有上面策略全部通过,Shard才允许停留在当前Node上,否则会执行Relocating Shard过程     完成了分片搬迁,接下来会对集群中的分片均衡性做检查,ES内通过Balancer.balance方法实现,我们看看Rebalance过程是怎样的:

Rebalance

    Rebalance之前会经过上面十四个策略实现的canRebalance方法,全部通过才会执行后面的Rebalance过程:     Rebalance过程是通过调用balanceByWeights()方法,该方法会计算shard所在每个Node的Weight值 其中,weight的计算公式为:

weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode()
weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index)
weight = theta0 * weightShard + theta1 * weightIndex

注: numAdditionalShards,一般为0,调用weightShardAdded,weightShardRemoved,分别为1和-1 theta0=“cluster.routing.allocation.balance.shard”系统动态配置项,默认值为0.45f theta1=“cluster.routing.allocation.balance.index”,系统动态配置项,默认值为0.55f

将算出的Weight最小和最大的差值与系统配置的“threshold”比较,超过threshold值会执行rebalance的shard搬迁,来均衡集群中的shard。

总结

这便是Shard分配,搬迁和平衡的全部过程,ElasticSearch通过这三个操作,保证Shard在Node之间均衡的分配,修改动态配置后完成Shard迁移,以及在集群运行过程中的自动均衡。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏互联网杂技

SpringBoot ( 十二 ) :SpringBoot 如何测试打包部署

有很多网友会时不时的问我,spring boot项目如何测试,如何部署,在生产中有什么好的部署方案吗?这篇文章就来介绍一下spring boot 如何开发、调试...

882
来自专栏开发与安全

中断机制和中断描述符表、中断和异常的处理

注:本分类下文章大多整理自《深入分析linux内核源代码》一书,另有参考其他一些资料如《linux内核完全剖析》、《linux c 编程一站式学习》等,只是为了...

2430
来自专栏草根专栏

使用Identity Server 4建立Authorization Server (1)

本文内容基本完全来自于Identity Server 4官方文档: https://identityserver4.readthedocs.io/ 官方文档很详...

47910
来自专栏Felix的技术分享

《一个操作系统的实现》笔记(2)--保护模式

3398
来自专栏技术翻译

分析R中的Elasticsearch数据

您可以在任何可以安装R和Java的计算机上使用纯R脚本和标准SQL访问Elasticsearch数据。您可以使用适用于Elasticsearch的CData J...

1833
来自专栏静默虚空的博客

WebSocket 详解教程

概述 WebSocket 是什么? WebSocket 是一种网络通信协议。RFC6455 定义了它的通信标准。 WebSocket 是 HTML5 开始提供的...

4527
来自专栏好好学java的技术栈

SpringBoot 使用WebSocket打造在线聊天室(基于注解)

例如:webSocket.onmessage = function (event) {console.log('WebSocket收到消息:' + event....

4922
来自专栏菩提树下的杨过

ActiveMQ笔记(6):消息延时投递

在开发业务系统时,某些业务场景需要消息定时发送或延时发送(类似:飞信的短信定时发送需求),这时候就需要用到activemq的消息延时投递,详细的文档可参考官网说...

3095
来自专栏时序数据库专栏

Elasticsearch集群Shard Allocation机制

    Elasticsearch由一些Elasticsearch进程(Node)组成集群,用来存放索引(Index)。为了存放数据量很大的索引,Elastic...

1500
来自专栏大白虾谈架构

GitHub 多人协作开发 三种方式:

2634

扫码关注云+社区