通常我们在每台机器部署并启动一个ES进程,怎么让多台机器上的多个ES进程,互相发现对方,然后完美的组成一个ES集群呢?
默认情况下,ES进程会绑定在自己的回环地址上,也就是127.0.0.1,然后扫描本机上的9300~9305端口号,尝试跟这些端口上启动的其他ES进程进行通信,然后组成一个集群。这对于在本机上搭建ES集群的开发环境是很方便的。但是对于生产环境下的集群是不行的,需要将每台ES进程绑定在一个非回环的IP地址上,才能跟其他节点进行通信,同时需要使用集群发现机制(discovery)来跟其他节点上的ES node进行通信,同时discovery机制也负责ES集群的Master选举。
ES node中有Master Node和Data Node两种角色。
ES 是一种p2p,也就是点对点(Peer to Peer)的分布式系统架构,不是Hadoop生态普遍采用的那种Master-Slave主从架构的分布式系统。集群中的每个node是直接跟其他节点进行通信的,几乎所有的API操作,比如index,delete,search等都不是Client跟Master通信,而是Client跟任何一个node进行通信,那个node再将请求转发给对应的node来进行执行。
两个角色,Master Node,Data Node。正常情况下,就只有一个Master Node。master node的责任就是负责维护整个集群的状态信息,也就是一些集群元数据信息,同时在新node加入集群或者从集群中下线时,或者是创建或删除了一个索引后,重新分配shard。包括每次集群状态如果有改变的化,那么master都会负责将集群状态同步给所有的node。
Master Node负责接收所有的集群状态变化相关的信息,然后将改变后的最新集群状态推动给集群中所有的Data Node,集群中所有的node都有一份完整的集群状态。只不过Master Node负责维护而已。其他的node,除了master之外的Data Node,就是负责数据的读写。
如果要让多个Node组成一个es集群,首先第一个要设置的参数,就是cluster.name
,多个node的cluster.name
一样,才满足组成一个集群的基本条件。cluster.name
的默认值是my-application
,在生产环境中,一定要修改这个值,否则可能会导致未知的node无端加入集群,造成集群运行异常。
而ES中默认的discovery机制,就是zen discovery机制,zen discovery机制提供了unicast discovery集群发现机制,集群发现时的节点间通信是依赖的Transport Module,也就是ES底层的网络通信模块和协议。
ES默认配置是使用unicast集群发现机制,从而让经过特殊配置的节点可以组成一个集群,而不是随便哪个节点都可以组成一个集群。但是默认配置下,unicast是本机,也就是localhost,因此只能在一台机器上启动多个node来组成一个集群。虽然ES还是会提供multicast plugin作为一个发现机制,但是已经不建议在生产环境中使用了。虽然我们可能想要multicast的简单性,就是所有的node可以再接收到一条multicast ping之后就立即自动加入集群。但是multicast机制有很多的问题,而且很脆弱,比如网络有轻微的调整,就可能导致节点无法发现对方。因此现在建议在生产环境中用unicast机制,提供一个ES种子节点作为中转路由节点就可以了。
另外还需要在集群中规划出专门的Master Eligible Node和Data node,一个节点只要它是Master Eligible Node,才有可能被选举为真正的Master Node,选举出真正的Master Node之后,其他的Master Eligible Node,将在Master Node故障之后,通过选举,从中再产生一个新的Master Node,同时,所有的非Master Node,都是Data Node,也就是说,Master Eligible Node只是有机会成为Master Node,只要你不是Master Node,你就是Data Node,而不是Master Eligible Node的Data Node是没有升级成为Master Node的资格的。
如果是一个小集群,10个以内的节点,那就所有节点都可以作为Master Eligible Node以及Data node即可,超过10个node的集群再单独拆分Master Eligible Node和Data Node。
# 设置为Master Eligible Node
node.master: true
# 设置为Data Node
node.data: true
默认情况下,ES会将自己绑定到127.0.0.1上,对于运行一个单节点的开发模式下的ES是ok的。但是为了让节点间可以互相通信以组成一个集群,需要让节点绑定到一个IP地址上
network.host: 192.168.0.1
只要不是本地回环地址,ES就会认为我们从开发模式迁移到生产模式,同时会启用一系列的集群检测
discovery.zen.ping.unicast.hosts: ["host1", "host2"]
简单来说,如果要让多个节点发现对方并且组成一个集群,那么就得有一个中间的公共节点,当不同的节点发送请求到这些公共节点,通过这些公共节点交换各自的信息,进而让所有的node感知到其他的node存在,并且进行通信,最后组成一个集群。这就是基于gossip流言式通信协议的unicast集群发现机制。
当一个node与discovery.zen.ping.unicast.hosts
中的一个成员通信之后,就会接收到一份完整的集群状态,接着这个node再跟master通信,并且加入集群中。这就意味着,discovery.zen.ping.unicast.hosts
是不需要列出集群中的所有节点的,只要提供少数几个node,比如3个,让新的node可以连接上即可,如果我们给集群中分配了几个节点作为专门的master节点,那么这里配置那些master节点即可,这个配置中也可以指定端口:
discovery.zen.ping.unicast.hosts: ["host1", "host2:9201"]
为集群选举出一个master是很重要的,ES集群会自动完成这个操作,node.master
设置为false的节点是无法称为Master的,discovery.zen.minimum_master_nodes
参数用于设置对于一个ES集群来说,必须拥有的最少的正常在线的Master Eligible Node的个数,否则会发生"集群脑裂"现象,假如在集群中设置了3个Master Eligible Node,那么这个值应该为(master_eligible_nodes / 2) + 1
,即2
discovery.zen.minimum_master_nodes: 2
discovery.zen.minimum_master_nodes
参数对于集群的可靠性来说,是非常重要的。这个设置可以预防脑裂问题,也就是预防一个集群中存在两个master。
这个参数的作用,就是告诉ES直到有足够的master候选节点时,才可以选举出一个master,否则就不要选举出一个master。这个参数必须被设置为集群中master候选节点的quorum数量,也就是大多数。quorum的算法,就是:master候选节点数量 / 2 + 1,比如我们有10个节点,都能维护数据,也可以是master候选节点,那么quorum就是10 / 2 + 1 = 6,如果我们有三个master候选节点,还有100个数据节点,那么quorum就是3 / 2 + 1 = 2,Elasticsearch要求最少有3个节点,如果我们有2个节点,都可以是master候选节点,那么quorum是2 / 2 + 1 = 2。此时就有问题了,因为如果一个Master挂掉了,那么剩下一个master候选节点,是无法满足quorum数量的,也就无法选举出新的master,集群就彻底挂掉了。此时就只能将这个参数设置为1,如果发生了网络分区,那么两个分区中都会有一个Master,还是无法避免集群脑裂
那么这个是参数是如何避免脑裂问题的产生的呢?比如我们有3个节点,quorum是2,现在网络故障,1个节点在一个网络区域,另外2个节点在另外一个网络区域,不同的网络区域内无法通信。这个时候有两种情况:
在ES集群是可以动态增加和下线节点的,所以可能随时会改变quorum。所以这个参数也是可以通过API随时修改的,特别是在节点上线和下线的时候,都需要作出对应的修改。而且一旦修改过后,这个配置就会持久化保存下来。
PUT /_cluster/settings
{
"persistent" : {
"discovery.zen.minimum_master_nodes": 2
}
}
此外还有一些其他的关于集群发现机制相关的配置,以下将列出上述讨论的参数以及一些其他的参数做总结:
#设置集群为single-node模式,这样的话,ES将不会再从外部去发现其他节点,默认不配资,代表可以发现其他节点
discovery.type: single-node
discovery.zen.ping.unicast.hosts: ["host1", "host2"]
#主机名被DNS解析为IP地址的超时时间
discovery.zen.ping.unicast.resolve_timeout: 5s
#在决定开始选举或加入现有集群之前节点将等待多长时间
discovery.zen.ping_timeout: 3s
#节点决定加入现有集群后,向Master发送加入请求,超时时间默认discovery.zen.ping_timeout时间的20倍
discovery.zen.join_timeout: 20
#Master发送修改集群状态的消息给其他节点,如果在此时间内还没有至少discovery.zen.minimum_master_nodes个备用Master节点回复,则此次修改集群状态的动作不会发生
discovery.zen.commit_timeout: 30s
#满足上面的条件后,Master发送修改集群状态的消息给全部Node,然后全部Node开始修改自身的集群状态信息,Master只有会等待所有的Node响应,最多等待此处的时间,然后才会开始下一次状态修改的流程
discovery.zen.publish_timeout: 30s
#设置集群中备用master的quorum,如果集群中备用的Master的个数少于此配置,将引发集群脑裂的现象
discovery.zen.minimum_master_nodes: 2
#选举Master的时候,是否忽略node.master=false的节点发送的ping消息,默认false
discovery.zen.master_election.ignore_non_master_pings: false
#当集群中没有存活的Master后,禁用外部请求允许的操作,write:外部请求只能读,不能写,all:外部请求不能读写ES集群
discovery.zen.no_master_block: write
#Master与Data Node互相发送ping消息的时间间隔
ping_interval: 1s
#Master与Data Node互相发送ping消息超时时间
ping_timeout: 30s
#Master与Data Node互相发送ping消息失败后的重试次数
ping_retries: 3
GET /index_name/_settings
{
"index_name" : {
"settings" : {
"index" : {
"creation_date" : "1593417909285",
"number_of_shards" : "5",
"number_of_replicas" : "1",
"uuid" : "-nelhb1LRX6z7tXk647yGw",
"version" : {
"created" : "6060099"
},
"provided_name" : "shop"
}
}
}
}
PUT index_name
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1
}
}
}
PUT /index_name/_settings
{
"number_of_replicas": 2
}
GET /index_name/_search_shards
POST /_cluster/reroute
{
"commands": [
{
"move": {
"index": "index_name",
"shard": 0,
"from_node": "node_name1",
"to_node": "node_name2"
}
}
]
}
POST /index/type/id?routing=your_routing_number
手动设置routing number的值这个功能很重要,假如想让业务上某一类数据都保存在同一个主分片上以提高批量读取的性能,那么将这些数据设置为相同的routing_number即可
GET /index/_search_shards?routing=your_routing_number
写请求:
读请求:
在进行增删改操作的时候,可以手动指定写一致性策略:
PUT /index_name/type_name/id?consistency=quorum
写一致性策略包括:
# 默认单位为ms,其他单位需要手动指定,例如timeout=30s
PUT /index_name/type_name/id?timeout=30
PUT /user/_doc/3
{
"uid": "1003",
"uname": "Jerry"
}
{
"_index" : "user",
"_type" : "_doc",
"_id" : "3",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 5,
"_primary_term" : 1
}
之后每次对这个文档好进行修改包括删除操作,它的"_version"都会加1:
PUT /user/_doc/3
{
"uid": "1003",
"uname": "Tony"
}
{
"_index" : "user",
"_type" : "_doc",
"_id" : "3",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 6,
"_primary_term" : 1
}
DELETE /user/_doc/3
{
"_index" : "user",
"_type" : "_doc",
"_id" : "3",
"_version" : 3,
"result" : "deleted",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 7,
"_primary_term" : 1
}
假设有两个客户端同时修改这条数据,A客户端读取到此数据时,version=1,然后A客户端开始修改此数据,同一时间,B客户端也修改此数据,修改成功之后,verison变为了2,那么A客户端再提交自己的修改就会失败,ES的机制是,修改前后的版本号要相同才可以提交,A客户端再次提交修改请求,拿到verison=2的数据并修改,才会成功,以下是一个模拟使用版本号进行并发控制的实验:
# 插入一条数据,其初始版本为1
PUT /user/_doc/4
{
"uid": "1004",
"uname": "Bob"
}
{
"_index" : "user",
"_type" : "_doc",
"_id" : "4",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 8,
"_primary_term" : 1
}
# 客户端A修改了数据,提交的时候指定当前verison=1,与服务端版本号一致
# 所以可以修改成功,然后version变为2
PUT /user/_doc/4?version=1
{
"uid": "1004",
"uname": "Bob Bob"
}
{
"_index" : "user",
"_type" : "_doc",
"_id" : "4",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 9,
"_primary_term" : 1
}
# 客户端B修改数据,提交的时候指定当前verison=1,与服务端版本号不一致
# 修改失败
PUT /user/_doc/4?version=1
{
"uid": "1004",
"uname": "Bob Bob Bob"
}
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[_doc][4]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "PIpA38NgRfyomgHHdAmHnw",
"shard": "0",
"index": "user"
}
],
"type": "version_conflict_engine_exception",
"reason": "[_doc][4]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "PIpA38NgRfyomgHHdAmHnw",
"shard": "0",
"index": "user"
},
"status": 409
}
# 客户端B再次修改数据,提交的时候指定当前verison=2,与服务端版本号一致
# 修改成功,version+1
{
"_index" : "user",
"_type" : "_doc",
"_id" : "4",
"_version" : 3,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 10,
"_primary_term" : 1
}
# 插入一条数据,其初始版本为1
PUT /user/_doc/5
{
"uid": "1005",
"uname": "Ella"
}
{
"_index" : "user",
"_type" : "_doc",
"_id" : "5",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 11,
"_primary_term" : 1
}
# 客户端A修改了数据,提交的时候指定外部verison=5,大于服务器版本
# 所以可以修改成功,然后version变为5
PUT /user/_doc/5?version=5&version_type=external
{
"uid": "1005",
"uname": "Ella Ella"
}
{
"_index" : "user",
"_type" : "_doc",
"_id" : "5",
"_version" : 5,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 12,
"_primary_term" : 1
}
# 客户端B修改数据,提交的时候指定当前verison=4,小于服务器版本
# 修改失败
PUT /user/_doc/5?version=4&version_type=external
{
"uid": "1005",
"uname": "Ella Ella Ella"
}
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[_doc][5]: version conflict, current version [5] is higher or equal to the one provided [4]",
"index_uuid": "PIpA38NgRfyomgHHdAmHnw",
"shard": "0",
"index": "user"
}
],
"type": "version_conflict_engine_exception",
"reason": "[_doc][5]: version conflict, current version [5] is higher or equal to the one provided [4]",
"index_uuid": "PIpA38NgRfyomgHHdAmHnw",
"shard": "0",
"index": "user"
},
"status": 409
}
# 客户端B再次修改数据,提交的时候指定当前verison=7,大于服务器版本
# 修改成功,version变为7
PUT /user/_doc/5?version=7&version_type=external
{
"uid": "1005",
"uname": "Ella Ella Ella"
}
{
"_index" : "user",
"_type" : "_doc",
"_id" : "5",
"_version" : 7,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 13,
"_primary_term" : 1
}
另外,retry_on_conflict参数可以设置重试次数:
POST /user/_doc/5?retry_on_conflict=5&version=7
当更新失败后,再次获取document数据和最新版本号,基于最新的版本号再次尝试更新,这样的重复会自动执行retry_on_conflict设置的次数,知道更新成功或者重复这么多次后还没有更新成功则返回Error
上面说到,删除一条文档的时候,将其version+1,实际上,删除文档并不会直接进行物理删除,而是标记为delete状态,随着数据的增加,ES会选择一个合适的时机批量删除标记为delete状态这批数据,所以在物理删除该数据之前,客户端再插入一条id相同的数据,只是将原来的数据的状态又改为update,然后version+1,其实还是修改了原数据,而非插入一条新数据