昨天我给大家分享了我们电商平台是怎么去做分布式事务的(不好意思,懂分布式事务的你真的很了不起,下篇,不好意思,懂分布式事务的你真的很了不起,上篇),忘记了的朋友希望能回去扫一下,这些都是你进阶路上必须要经历的。
今天,分享的专题是分布式锁。那锁是什么,我就不多说了,相信大家都知道;分布式锁又是什么呢,也不用多说,这些概念大家都能不假思索的脱口而出了,简单的理解不就是,在分布式环境中多个机器共有的一把锁么,对不对。
我们今天的重心是,我们怎么去开发一个分布式锁去满足我们业务。我今天会讲我们部门三个分布式锁的实现方式,帮大家去少走弯路,直接照着做就行啦。
01
基于数据库开发
我们组有个不大不小的项目,在前两年用的这种方式去保证分布式环境中各个应用争抢共享资源。目前,也没有出大问题,主要是并发量不是太大,然后数据库是单独一台机器,配置是8CPU 16G。是怎么做的呢?
优点:简单易实现,只要逻辑清晰,利用一张数据库表就实现了,很方便。
缺点:对于并发量大的话,数据库的IO可能就是瓶颈,会造成性能低,吞吐量低等。
开发建议:在初创公司没有引入像redis或者zookeeper组件的时候,对于并发不大的业务就用这个方案,是很不错的。
02
基于内存Redis开发
由于数据库在IO方面以及高并发下受限等一些缺点,后来Redis的出现,我们就将部分分布式业务中的锁放到了Redis中去管理,相关思路如下:
Redisson
大家也可以直接使用开源组件,这里推荐一个比较好用的一个基于redis实现的开源分布式锁,Redisson。使用起来还是很方便的。
基本结构差不多这个样子,更多的可以看下Redisson官网:
RLock lock = redissonClient.getLock(getLockKey(t));
try {
if (lock.tryLock()) {
//业务逻辑
return true;
} else {
// 获取锁失败的业务逻辑
return false;
}
} finally {
lock.unlock();
}
}
缺点:基于Redis内存开发的分布式锁,主要有个问题就是,超时时间我们不能确保合理设置,需要一直监控并且根据业务调整。
03
基于zookeeper开发
这里可以使用zookeeper的临时顺序节点来实现分布式锁。
实现步骤:
实现代码:
public class ZkLock {
public final static Joiner j = Joiner.on("|").useForNull("");
// zk客户端
private ZooKeeper zk;
// zk是一个目录结构,root为最外层目录
private String root = "/locks";
// 锁的名称
private String lockName;
// 当前创建的序列节点
private ThreadLocal<String> nodeId = new ThreadLocal<>();
// 用来同步等待zkclient链接到了服务端
private CountDownLatch connectedSignal = new CountDownLatch(1);
private final static int sessionTimeout = 3000;
private final static byte[] data = new byte[0];
public ZookeeperDistributedLock(String config, String lockName) {
this.lockName = lockName;
try {
zk = new ZooKeeper(config, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 建立连接
if (event.getState() == KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
Stat stat = zk.exists(root, false);
if (null == stat) {
// 创建根节点
zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
class LockWatcher implements Watcher {
private CountDownLatch latch = null;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted)
latch.countDown();
}
}
public void lock() {
try {
// 创建临时子节点
String myNode = zk.create(root + "/" + lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(j.join(Thread.currentThread().getName() + myNode, "created"));
// 获取所有子节点
List<String> subNodes = zk.getChildren(root, false);
TreeSet<String> sortedNodes = new TreeSet<>();
for (String node : subNodes) {
sortedNodes.add(root + "/" + node);
}
String smallNode = sortedNodes.first();
String preNode = sortedNodes.lower(myNode);
if (myNode.equals(smallNode)) {
// 是否最小节点,则表示取得锁
System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock"));
this.nodeId.set(myNode);
return;
}
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。
// 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if (stat != null) {
System.out.println(j.join(Thread.currentThread().getName(), myNode,
" waiting for " + root + "/" + preNode + " released lock"));
latch.await();// 等待,这里应该一直等待其他线程释放锁
nodeId.set(myNode);
latch = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void unlock() {
try {
System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock "));
if (null != nodeId) {
zk.delete(nodeId.get(), -1);
}
nodeId.remove();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
04
方案比较
最后,我们再来将这三种方案进行个比较,方便大家选择:
如果对于zookeeper这种流程还是不清楚的话,后期我再整理一篇zookeeper实现流程讲解。
总结,今天我把我们自己实现分布式锁的三种方案实现细节都分享出来,也是从简单到复杂的,根据业务的不同去选择方案的,大家也可以根据自己的业务去分析,直接在我这里选用就行了。
关于架构师修炼
本号旨在分享一线互联网各种技术架构解决方案,分布式以及高并发等相关专题,同时会将作者的学习总结进行整理并分享。
更多技术专题,敬请期待