接之前一篇<手写zookeeper来模拟dubbo的注册/发现>,使用一致性Hash来进行查找需要寻找的服务.
Hash处理接口
public interface HashFunc {
public Long hash(Object key);
}
一致性Hash类
public class ConsistentHash<T> {
/**
* Hash计算对象,用于自定义hash算法
*/
HashFunc hashFunc;
/**
* 复制的节点个数
*/
private int numberOfReplicas;
/**
* 一致性Hash环
*/
private SortedMap<Long, T> circle = new TreeMap();
/**
* 构造,使用Java默认的Hash算法
* @param numberOfReplicas 复制的节点个数,增加每个节点的复制节点有利于负载均衡
* @param nodes 节点对象
*/
public ConsistentHash(int numberOfReplicas, Collection<T> nodes) {
this.numberOfReplicas = numberOfReplicas;
this.hashFunc = new HashFunc() {
public Long hash(Object key) {
// return fnv1HashingAlg(key.toString());
return md5HashingAlg(key.toString());
}
};
//初始化节点
for (T node : nodes) {
add(node);
}
}
/**
* 构造
* @param hashFunc hash算法对象
* @param numberOfReplicas 复制的节点个数,增加每个节点的复制节点有利于负载均衡
* @param nodes 节点对象
*/
public ConsistentHash(HashFunc hashFunc, int numberOfReplicas, Collection<T> nodes) {
this.numberOfReplicas = numberOfReplicas;
this.hashFunc = hashFunc;
//初始化节点
for (T node : nodes) {
add(node);
}
}
/**
* 增加节点<br>
* 每增加一个节点,就会在闭环上增加给定复制节点数<br>
* 例如复制节点数是2,则每调用此方法一次,增加两个虚拟节点,这两个节点指向同一Node
* 由于hash算法会调用node的toString方法,故按照toString去重
*
* @param node 节点对象
*/
public void add(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.put(hashFunc.hash(node.toString() + i), node);
}
}
/**
* 移除节点的同时移除相应的虚拟节点
*
* @param node 节点对象
*/
public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
circle.remove(hashFunc.hash(node.toString() + i));
}
}
/**
* 获得一个最近的顺时针节点
*
* @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点
* @return 节点对象
*/
public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
long hash = hashFunc.hash(key);
if (!circle.containsKey(hash)) {
SortedMap<Long, T> tailMap = circle.tailMap(hash); //返回此映射的部分视图,其键大于等于 hash
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
//正好命中
return circle.get(hash);
}
/**
* 使用MD5算法
* @param key
* @return
*/
private static long md5HashingAlg(String key) {
MessageDigest md5 = null;
try {
md5 = MessageDigest.getInstance("MD5");
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8)| (long) (bKey[0] & 0xFF);
return res;
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return 0l;
}
/**
* 使用FNV1hash算法
* @param key
* @return
*/
private static long fnv1HashingAlg(String key) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < key.length(); i++)
hash = (hash ^ key.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash;
}
}
发现代码
@Component
public class ClientComsumer implements Watcher {
//本地缓存服务列表
private static Map<String, List<String>> servermap;
@Autowired
private ZookeeperServer zkServer ;
private ZooKeeper zk;
private ConsistentHash consistentHash;
@Autowired
Environment env;
@PostConstruct
private void init() throws IOException {
String address = env.getProperty("zookeeper.address");
this.zk = zkServer.getConnection(address,this);
}
private List<String> getNodeList(String serverName) throws KeeperException, InterruptedException, IOException {
if (servermap == null) {
servermap = new HashMap<>();
}
Stat exists = null;
try {
String s = "/guanjian/" + serverName;
exists = zk.exists(s,this);
} catch (Exception e) {
}
//判断是否存在该服务
if (exists == null) return null;
List<String> serverList = servermap.get(serverName);
if (serverList != null && serverList.size() > 0) {
//将已存在的serverList放入一致性Hash环
this.consistentHash = new ConsistentHash(serverList.size(),serverList);
return serverList;
}
List<String> children = zk.getChildren("/guanjian/" + serverName,this);
List<String> list = new ArrayList<>();
for (String s : children) {
byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null);
String datas = new String(data);
NodeStat nodeStat = JSONObject.parseObject(datas, NodeStat.class);
if (!Status.stop.equals(nodeStat.getStatus())) {
list.add(datas);
}
}
//将list放入一致性Hash环
this.consistentHash = new ConsistentHash(list.size(),list);
servermap.put(serverName, list);
return list;
}
public String getServerinfo(String serverName) throws KeeperException, InterruptedException, IOException {
try {
List<String> nodeList = getNodeList(serverName);
if (nodeList == null|| nodeList.size()< 1) {
return null;
}
//这里使用得随机负载策略,如需需要自己可以实现其他得负载策略
//String snode = nodeList.get((int) (Math.random() * nodeList.size()));
//从一致性Hash环的第一个服务开始查找
String snode = (String) this.consistentHash.get(nodeList.get(0));
NodeStat nodeStat = JSONObject.parseObject(snode, NodeStat.class);
List<String> children = zk.getChildren("/guanjian/" + serverName,this);
//随机负载后,将随机取得节点后的状态更新为run
for (String s : children) {
byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null);
String datas = new String(data);
if (snode.equals(datas)) {
nodeStat.setStatus(Status.run);
zk.setData("/guanjian/" + serverName + "/" + s,JSONObject.toJSONString(nodeStat).getBytes(),0);
break;
}
}
return JSONObject.toJSONString(nodeStat);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public void process(WatchedEvent watchedEvent) {
//如果服务节点数据发生变化则清空本地缓存
if (watchedEvent.getType().equals(Event.EventType.NodeChildrenChanged)) {
servermap = null;
}
}
}