
在分布式系统中,多个节点可能同时操作同一资源,此时需要使用分布式锁来保护共享资源的访问。分布式锁要求在多个节点上都能起到保护作用,并且能够保证在高并发情况下的正确性和效率。
ZooKeeper 是一个开源的分布式协调服务框架,其提供了分布式锁的实现。ZooKeeper 的锁机制主要通过节点创建和Watch机制实现。
public class ZookeeperLock {
private static final String ZK_ADDRESS = "localhost:2181";
private static final int SESSION_TIMEOUT = 3000;
private String lockPath;
private ZooKeeper zooKeeper;
public ZookeeperLock(String lockPath) {
this.lockPath = "/locks/" + lockPath;
try {
zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, null);
Stat stat = zooKeeper.exists("/locks", false);
if (stat == null) {
zooKeeper.create("/locks", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
stat = zooKeeper.exists(lockPath, false);
if (stat == null) {
zooKeeper.create(this.lockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void lock() throws Exception {
// 创建临时节点
String seqNodeName = zooKeeper.create(lockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 尝试获取锁
attemptLock(seqNodeName);
}
private boolean attemptLock(String seqNodeName) throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/locks", false);
Collections.sort(children);
int index = children.indexOf(seqNodeName.substring("/locks/".length()));
switch (index) {
case -1:
throw new KeeperException.NoNodeException("path not found");
case 0:
return true;
default:
// 获取前一个节点的路径和 watcher
String preWatcherPath = "/locks/" + children.get(index - 1);
final CountDownLatch latch = new CountDownLatch(1);
Stat stat = zooKeeper.exists(preWatcherPath, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
});
// 前一个锁已经释放
if (stat == null) {
return attemptLock(seqNodeName);
}
// 等待前一个节点的删除事件
latch.await();
return true;
}
}
public void unlock() {
try {
zooKeeper.delete(lockPath, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
} finally {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}Redis 是一种基于内存的高性能键值数据库,其提供了一种实现分布式锁的机制。
public class RedisLock {
private static final int RETRY_TIMES = 10;
private static final long DEFAULT_EXPIRE_TIME = 3000; // 默认过期时间 3s
private static final String LOCK_PREFIX = "lock:";
private RedisTemplate<String, String> redisTemplate;
public RedisLock(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public boolean lock(String lockKey) {
return lock(lockKey, DEFAULT_EXPIRE_TIME);
}
public boolean lock(String lockKey, long expireTime) {
String key = LOCK_PREFIX + lockKey;
for (int i = 0; i < RETRY_TIMES; i++) {
// SETNX 命令尝试获取锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "");
if (result != null && result) {
// 设置锁过期时间
redisTemplate.expire(key, expireTime, TimeUnit.MILLISECONDS);
return true;
}
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
return false;
}
}
return false;
}
public void unlock(String lockKey) {
String key = LOCK_PREFIX + lockKey;
redisTemplate.delete(key);
}
}分布式锁可以应用于需要保证数据一致性、防止重复操作或单一任务等场景,如限流、秒杀、抢购等高并发场景。