哈希(hash)可能是是计算机界应用最广泛的算法了,小到 HashMap
,大到分布式键值系统(Key-Value Store),哈希算法无处不在。其将无限的,分布不均匀的值均匀映射到有限区间的能力使其广泛运用在索引,加密算法以及负载均衡器中。
今天笔者将分享著名的分布式算法,一致性哈希(Consistent Hash)的原理与应用场景。不过首先我们先来复习一下哈希的一个简单应用,那就是面试经常会被问到的 HashMap
,或者说python中的 dict
。
写代码的过程中我们经常会遇到一个问题,如何把一个键,映射到一个值,比方说你需要在程序中维护一个电话簿,需要快速地通过人名查找到其手机号码,最简单的方式可能是把名字和电话号码的键值对(key-value pair)存在一个数组中,当要查询时,遍历一遍数组,如以下python代码所示,当查询 morty
的手机号时,我们得到了 567
,当要加入一个人的电话号码时,我们直接append进这个数组。
contact_book = [["rick", "123"], ["morty", "567"], ["summer", "789"]]
def look_up(name):
for pair in contact_book:
if pair[0] == name:
return pair[1]
return ""
def add(name, contact):
contact_book.append([name, contact])
以上代码有一个严重的问题,那就是为了找出一个人的电话号码,我们需要把整个数组遍历一遍,当数组长度特别长时,整个算法的时间复杂度会非常糟糕,也许有人会提出利用二分查找或者动态平衡二叉树来解决,当然这不失为一个解,但不是今天笔者要和大家讨论的话题。
为了解决这个问题,我们不妨设想,如果我能够维护26个电话号码簿,当插入一个人的电话号码时,我用这个人名字的首字母来决定插入哪一个号码簿,同理,当查询时,我也用这个人名字的首字母来决定查询哪一个号码簿,我岂不是可以将查询的时间复杂度降为原来的二十六分之一?
contact_books = [
# Contact book with name starting with "a"
[["anna", "1234"], ["amy", "5678"]],
# Contact book with name starting with "b"
[["bonny", "5678"], ["beth", "9876"]],
# Contact book from "c" to "y" below
# ...
# Contact book from "c" to "y" above
# Contact book with name starting with "z"
[["zeta", "4567"], ["zip", "7896"]],
]
def look_up(name):
contact_book = contact_books[ord(name[0]) - 97]
for pair in contact_book:
if pair[0] == name:
return pair[1]
return ""
def add(name, contact):
contact_book = contact_books[ord(name[0]) - 97]
contact_book.append([name, contact])
正如以上代码所示,新的电话簿从原来的单个变成了一个由26个电话簿组成的数组,在新的 look_up
和 add
函数中, ord(name[0]-97)
会将字符转化为整数也就是电话簿数组的索引,比如 a
会被转为 0
, 因为 a
的ascii值为 97
,因此以字母 a
开头的名字会被映射到第一个电话薄。当根据名字的首字母找到对应的电话簿时,我们再进行上面提到的数组遍历,因此我们遍历数组的时间大大减少了,这也是手机通讯录会用到的一个优化:首字母相同的人的联系方式会被放在一起,方便用户查询。
其实以上用到的方法就是哈希,因为不知不觉中,我们已经将无限范围的值(名字可以有无数种)映射到了有限范围(26个)的电话簿。但是取首字母的方法并不是特别的方便,如果为了加快查询速度,我们将电话簿的数目拓展到27个怎么办?英文字母只有26个啊。
为了解决将字符串(文本)转化为整数的问题,许许多多的哈希函数被提出,他们的具体实现笔者就不赘述了,我们就来看看python内置的 hash
函数如何做到:
print(hash("anna"))
# console output: -2835734963014601026
从以上代码可以看到,字符串 anna
被 hash
函数转换成了一个非常大的数字。而回到刚刚我们讨论的电话簿问题,我们只有26个电话簿,这么大的数字(且是个负数)是不能当电话簿数组的索引的。聪明的读者肯定想到了取模(mod)这个操作:
print(hash("anna") % 26)
# console output: 24
在以上代码中,我们成功地将 anna
映射到了 24
这个数字上!于是我们再来重构一下之前的查询与插入操作:
def look_up(name):
contact_book = contact_books[hash(name) % 26]
for pair in contact_book:
if pair[0] == name:
return pair[1]
return ""
def add(name, contact):
contact_book = contact_books[hash(name) % 26]
contact_book.append([name, contact])
此时我们已经成功地实现了一套可拓展的插入与查找函数,因为假如说电话簿个数从26变成了27,我们只需将取模的值改为27就好。
然而问题还是没有这么简单,假如说一开始我们维护着一个由10个电话簿组成的电话簿数组,哈希函数为 hash(name)%10
,一段时间过后,每个电话簿已经有很大的存储量,此时查询速度开始变得非常之慢,于是决定:将电话簿数目拓展到20个。此时,一个新的电话簿数组将建立,而我们如何将老的电话簿数组中的电话号码迁移到新的中呢?此时会需要进行重哈希(rehash)。也就是说我们需要将原电话簿数组的键值对(名字-电话号码)一个个拿出来,用新的哈希函数 hash(name)%20
来决定它们进入新电话簿数组中的哪一个电话簿。
这就是Java的 HashMap
,C++的 unordered_map
,python的 dict
以及go的 map
的底层算法机制。他们的实现都会维护一个bucket数组(一个bucket相当于上文中的一个电话簿),以及一个叫做负载因子(load factor)的参数,类似于总键值对个数与电话簿个数的比值,当这个参数大于一个阀值时,将会触发重哈希。
键值系统是数据存储系统中一个应用非常广泛的类别,像基于内存缓存的Redis和Memcached,以及许多的NoSQL数据库,比如LevelDB, RocksDB, DynamoDB以及HBase,都属于键值系统。比起SQL数据库,键值系统有着对高并发读写更好的支持能力,因此广泛运用在互联网大厂的业务系统中。
一个单机的键值系统与一个 HashMap
十分类似,不管其底层索引是基于哈希还是Sorted String Table(NoSQL存储引擎核心数据结构,将在之后的文章中详细分析),单机键值系统对外的接口都是通过键来获取值或者通过键来更改值。
假如说我们维护一个基于内存的单机键值系统(假设4G内存)来存储电话簿,当键值对不多时,系统并没有压力,可是当总键值对的大小超过4G时,系统内存将会溢出,导致机器直接宕机。为了解决这个问题,有人提出将内存一直堆高(纵向拓展,vertical scale),可是单机的内存是无法无限增长的,第一堆内存成本很高,第二最大内存被64位寻址能力限制住了。因此,最为实用与高效的做法是将这个电话簿分布在多台机器上,也就是一个集群中,通过对键取哈希来决定键值对存在哪台机器上,这就是所谓的横向拓展(horizontal scale),其有着无限的拓展能力。
上图展示了一个分布式键值系统的基本架构,每台 KVStore
机器拥有一个唯一的id,并且负责一个范围内键值对的查询,插入,修改与删除。最上层有一个 routing tier
或者说负载均衡器( load balancer
)来基于哈希算法并根据键计算出相对应的 KVStore
id,并将请求转发到对应的机器上。其哈希函数可以是上一部分中我们用到的python内置hash函数,比如 id=hash(key)%num_machines
可以计算出一个键值对对应的机器id。
到目前为止好像看上去都没有任何问题。然而,在分布式系统中,机器的增加和减少是非常频繁的,比如说我们想对系统扩容,于是从原本的10台机器增加到了12台机器,或者说突然有一台缓存机器宕机了,可用机器从10台变成了9台,按照上一部分中我们提到的重哈希机制,整个集群的键值对都将被重新分配!每次集群大小的变化导致所有数据的重哈希是非常耗时的操作,有没有一种办法,在集群大小动态变化的情况下,每次只重新分配一小部分键值对呢?
这就引出了著名的一致性哈希算法(consistent hashing)。一致性哈希被广泛应用在各种分布式键值系统中,比如DynamoDB,和各种缓存中间件(Redis本身并不是分布式的,需要类似于上文提到的routing tier一类的第三方的中间件来实现跨机器分片)。
上文中我们提到的分片算法将一个个键值对直接地映射到了明确的机器id上,而一致性哈希并不会将键值对直接与机器绑定,而是先将每台机器的id哈希到一个值,再将每个键值对哈希到一个值,在两个哈希值相邻的机器之间的所有键值对,都将被分配至哈希值大的那一台机器上。这样带来的一个好处就是,如果像下图一样,多出一台机器 machine4
,其哈希值在 machine2
和 machine3
之间,此时只有原本被分配到 machine3
的键值对被重分配到了 machine4
上。同理,如果 machine4
加入之后又宕机了,分配给其的所有键值对都将被重分配到 machine3
上。
可是这仍然带来一个问题,假如在上图中,有一个键值对被哈希到了 machine3
的右边怎么办?已经没有比 machine3
的哈希值大的机器了。此时,聪明的读者可能已经想到了,那就是通过限定哈希区间的最大值,比如设为1024,这样大于1024的值都将overflow,于是单调递增的哈希区间折叠成了一个哈希环(hash ring),这样每个键值对都可以被分配到一台机器上,amazing!
我们已经完全解决了分布式键值系统的负载均衡问题了吗?其实还没有,如果如下图所示,我们集群中机器数量较少,导致机器在哈希环上的分布不均匀,那么大部分的键值对都将被分配到 machine2
上,导致负载不均匀,可能 machine1
还没有什么负载, machine2
已经被压到宕机了。
DynamoDB使用了一个叫做virtual node的概念,那就是同一台机器,可以有多个virtual node(vnode),也就是说可以被哈希到哈希环上的多个位置,如下图所示, machine1
同时拥有 1-1
和 1-2
两个vnode, machine2
同时拥有 2-1
和 2-2
两个vnode,因为有了更多的哈希值,他们被均匀分配在哈希环上的概率增加了。更进一步的话,如果集群中的机器有着不一样的读写性能以及存储空间,可以给他们分配不一样数目的vnode,vnode数量越多的机器在哈希环上所处的位置更多,也就有更大的几率被分配到键值对。
在上文中我们提到了routing tier这一分布式中间件,并给出了整个键值系统的架构图。可以发现系统的所有流量都经过了routing tier这一台机器,这在分布式系统中是非常危险的,因为routing tier的宕机将直接导致整个系统的不可用。那么我们是不是可以设置多个routing tier呢?
上图中三个routing tier有着相同的DNS,所以这个分布式键值系统客户端的请求会均匀抵达这三台机器,如果说有一台或者两台机器宕机了,整个系统仍然是可用的!
我们还剩下最后一个问题,因为动态扩容/缩容,机器在哈希环上的位置是动态变化的,三个routing tier都维护自己的哈希环,如何让它们达成共识?这就涉及到了分布式系统中最为棘手的问题了,那就是Consensus Problem。1989年,Leslie Lamport发表了分布式系统届开山立派的论文 ThePart-TimeParliament
,其中提出了经典共识算法 Paxos
(笔者之后将会撰文详细分析共识算法,这里就不赘述了)。在本文中,我们只需知道我们需要一个共识协议来维护多个routing tier之间的共识与一致性。在大部分分布式系统中,这种问题一般都会交给一个高可用,强一致的分布式协调者(coordinator)来解决,比如ZooKeeper或者Etcd。
通过一致性哈希和分布式中间件,我们实现了一个动态扩容/缩容的,高可用分布式键值系统。虽然写到这里已经非常详细了,但是因为笔者很硬核,最后还是会给出笔者自己实现的完整python版一致性哈希:
import uuid
class KVStore:
def __init__(self):
self.hash_ring_size = 1048576
self.num_machines = 10
# Each physical machine has 3 virtual nodes
self.num_replications_per_machine = 3
self.machines = dict()
for _ in range(0, self.num_machines):
machine_id = str(uuid.uuid1())
assert machine_id not in self.machines
# Each machine is a dictionary
self.machines[machine_id] = dict()
# Implement hash ring with an array, for the convenience of binary search
self.hash_ring = list()
self.existing_vnode_hashes = set()
for machine_id in self.machines:
for replication in range(0, self.num_replications_per_machine):
vnode_id = self.__generate_vnode_id(machine_id, replication)
vnode_hash = self.__generate_hash(vnode_id)
# Assert no collision
assert vnode_hash not in self.existing_vnode_hashes
self.hash_ring.append([vnode_hash, machine_id])
self.existing_vnode_hashes.add(vnode_hash)
# Sort the hash ring according to vnode hash
self.hash_ring.sort(key=lambda x: x[0], reverse=False)
def insert(self, key, value):
machine_id = self.__look_up_machine_id(key=key)
machine = self.machines[machine_id]
machine[key] = value
def look_up(self, key):
machine_id = self.__look_up_machine_id(key=key)
machine = self.machines[machine_id]
return machine.get(key)
def list_machine_id(self):
machine_id_list = list()
for machine_id in self.machines:
machine_id_list.append(machine_id)
return machine_id_list
def add_machines(self, num_machines):
self.num_machines += num_machines
for _ in range(0, num_machines):
machine_id = str(uuid.uuid1())
machine = dict()
assert machine_id not in self.machines
self.machines[machine_id] = machine
for replication in range(0, self.num_replications_per_machine):
vnode_id = self.__generate_vnode_id(machine_id, replication)
vnode_hash = self.__generate_hash(vnode_id)
assert vnode_hash not in self.existing_vnode_hashes
# find the next vnode machine
next_vnode_machine_id = self.__look_up_machine_id(vnode_id)
# add the new vnode to hash ring
self.hash_ring.append([vnode_hash, machine_id])
self.existing_vnode_hashes.add(vnode_hash)
# resort the hash ring
self.hash_ring.sort(key=lambda x: x[0], reverse=False)
# rehash the elements in the next vnode machine
if next_vnode_machine_id == machine_id:
continue
next_vnode_machine = self.machines[next_vnode_machine_id]
rehashed_keys = list()
for key in next_vnode_machine:
if self.__look_up_machine_id(key) == machine_id:
rehashed_keys.append(key)
value = next_vnode_machine[key]
print("%s, %s rehashed" % (key, value))
machine[key] = value
for key in rehashed_keys:
next_vnode_machine.pop(key)
def kill_machine(self, machine_id):
if self.machines.get(machine_id) is None:
return
machine = self.machines.pop(machine_id)
# remove vnodes
for replication in range(0, self.num_replications_per_machine):
vnode_id = self.__generate_vnode_id(machine_id, replication)
vnode_hash = self.__generate_hash(vnode_id)
self.existing_vnode_hashes.remove(vnode_hash)
vnode_idx = self.__look_up_vnode_idx(vnode_id)
self.hash_ring.pop(vnode_idx)
for replication in range(0, self.num_replications_per_machine):
vnode_id = self.__generate_vnode_id(machine_id, replication)
next_vnode_machine_id = self.__look_up_machine_id(vnode_id)
next_vnode_machine = self.machines[next_vnode_machine_id]
rehashed_keys = list()
for key in machine:
if self.__look_up_machine_id(key) == next_vnode_machine_id:
rehashed_keys.append(key)
print("%s, %s rehashed" % (key, machine[key]))
next_vnode_machine[key] = machine[key]
for key in rehashed_keys:
machine.pop(key)
def __generate_vnode_id(self, machine_id, replication):
return "%s-%d" % (machine_id, replication)
def __generate_hash(self, key):
return hash(key) % self.hash_ring_size
# look up machine id by binary search
def __look_up_machine_id(self, key):
vnode_idx = self.__look_up_vnode_idx(key)
return self.hash_ring[vnode_idx][1]
def __look_up_vnode_idx(self, key):
key_hash = self.__generate_hash(key)
start = 0
end = len(self.hash_ring) - 1
while start <= end:
mid = (start + end) // 2
if self.hash_ring[mid][0] >= key_hash:
if mid == 0 or self.hash_ring[mid - 1][0] < key_hash:
return mid
else:
end = mid - 1
continue
else:
# Hash ring overflow
if mid == len(self.hash_ring) - 1:
return 0
else:
start = mid + 1
continue