需求场景
在分布式系统中,通常会有多个子系统需要操作同一资源,例如修改数据存储中的某一数据 这些子系统各自独立,操作共享资源时没有逻辑顺序,有可能会出现同时操作,发生冲突
这时就需要通过分布式锁来保护共享资源
分布式锁是在分布式环境下,保护跨进程、跨主机、跨网络的共享资源,实现互斥访问,保证一致性
ZooKeeper 解决思路
基本思路就是当系统A B C 一起来申请锁时,根据到达顺序给他们排个队,排在第一的就可以去操作共享资源,操作完成后就出队,再让新的排在第一的去操作共享资源,从而实现共享资源的互斥访问 ZooKeeper 的存储结构就像文件系统一样,是有层级的树形结构,可以让我们创建节点及子节点,而且节点可以是有序的 例如 ZooKeeper 根节点下有一个 Lock 节点,系统A、系统B、系统C 这时都想获取锁,那么他们就在 Lock 节点下新建一个有序型的子节点 data_A 系统A 执行创建节点的命令
create -s -e /Lock/data_A test
系统B 执行创建节点的命令
create -s -e /Lock/data_A test
系统C 执行创建节点的命令
create -s -e /Lock/data_A test
虽然他们都执行同样的命令,但因为要求有序,所以实际上会创建3个节点
创建节点后,会有一个返回值,相当于他们各自领到了一个排号儿的牌子,然后他们去获取 Lock 子节点列表,并按从小到大排序,看排在第一位的是不是自己的号码,如果是,就说明自己拿到了锁,可以去操作那个共享资源了 例如 Lock 子节点的排序结果为 data_A0000000000 data_A0000000001 data_A0000000002 系统A一对比自己的牌子,发现就是自己,可以去操作资源了 系统B和系统C发现自己不是第一位的,那就等等吧 系统A操作完资源后,回来释放锁,实际就是删掉自己的节点,执行删除命令 delete /Lock/data_A0000000000
这时 /Lock 的子节点变成了两个,状态发生了变化,ZooKeeper 会自动发出变动通知,系统B和系统C发现 /Lock 变了,马上再次获取其子节点,这时排序后的结果为 data_A0000000001 data_A0000000002 系统B一对比自己的牌子,就是自己,可以去操作资源了 系统C发现还不是自己,继续等吧 等系统B回来删除自己的节点后,才会轮到系统C 这样,通过创建有序节点、删除节点、自动监听机制,就实现了分布式锁 新建节点时的 create 命令中使用了两个选项,-s 和 -e
-s 表示要创建有序节点
-e 表示要创建临时节点 这个临时节点的特性也很有用,当创建这个节点的进程死掉了,不能回来删除节点时,ZooKeeper会自动把这个节点删掉,所以不用担心锁不被释放
示例代码
上面是用 ZooKeeper 客户端命令模拟的实现过程,下面是Java实现的示例代码,供参考
/**
* author: dzone.com
*/
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributedLock {
private final ZooKeeper zk;
private final String lockBasePath;
private final String lockName;
private String lockPath;
public DistributedLock(ZooKeeper zk, String lockBasePath, String lockName) {
this.zk = zk;
this.lockBasePath = lockBasePath;
this.lockName = lockName;
}
public void lock() throws IOException {
try {
lockPath = zk.create(lockBasePath + "/" + lockName, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
final Object lock = new Object();
synchronized (lock) {
while (true) {
List<String> nodes = zk.getChildren(lockBasePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (lock) {
lock.notifyAll();
}
}
});
Collections.sort(nodes);
if (lockPath.endsWith(nodes.get(0))) {
return;
} else {
lock.wait();
}
}
}
} catch (KeeperException e) {
throw new IOException(e);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public void unlock() throws IOException {
try {
zk.delete(lockPath, -1);
lockPath = null;
} catch (KeeperException e) {
throw new IOException(e);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}