前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何优雅的实现分布式锁?(文末赠书)

如何优雅的实现分布式锁?(文末赠书)

作者头像
范蠡
发布2020-06-15 10:42:03
4600
发布2020-06-15 10:42:03
举报

今天来介绍下的是分布式锁的实现。

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的JavaAPI并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

在一个应用中,通常存在着不同的模式,这几种分布式锁需要结合使用,将一种分布式锁不加区别的应用到所有场景将导致较差的效果。因此,我们分别讨论这三种方式的具体实现,以及容易出现的误用。

Oracle乐观锁

在oracle中实现乐观锁很简单,只要在相应的数据上增加一个版本控制,例如version,每次读出来的时候,把该字段也读出来,当写回去时,把该字段加1,提交之前跟数据库的该字段比较一次,如果比数据库的值大的话,就允许保存,否则不允许保存。如下所示:

会话1操作如下:

代码语言:javascript
复制
select id, balance, version from account where id="1";
查询结果:id=1, balance=1000, version=1
update account
set balance=balance+100, version=version+1
where id="1" and version=1;
select id, balance, version from account where id="1";
查询结果:id=1, balance=1100, version=2

会话2操作如下:

代码语言:javascript
复制
select id, balance, version from account where id="1";
查询结果:id=1, balance=1000, version=1
update account
set balance=balance-50, version=version+1
where id="1" and version=1;
select id, balance, version from account where id="1";
查询结果:id=1, balance=1100, version=2

会话1已修改成功,实际account.balance=1100、account.version=2,会话2也将版本号加1(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足提交版本必须大于记录当前版本才能执行更新的乐观锁策略,因此会话2的提交不会生效。

Redis分布式锁

使用Redis使用分布锁时,主要有两种方式,一种是基于原生的Redis客户端,使用setnx、getset以及expire等实现;另一种方式使用Redisson,它们就像Zookeeper的原生API与ApacheCurator。早期开发人员一般使用Redis客户端自己实现,这两年很多用户采用了Redission库。

1.基于Jedis的分布式锁

首先通过Maven引入Jedis开源组件,在pom.xml文件加入如下依赖:

代码语言:javascript
复制
<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

实现如下:

代码语言:javascript
复制
public class RedisUtil {
    private static finalString LOCK_SUCCESS = "OK";
    private static finalString SET_IF_NOT_EXIST = "NX";
    private static finalString SET_WITH_EXPIRE_TIME = "PX";
    /**
     * 尝试获取分布式锁。加锁代码满足我们可靠性里描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public booleantryGetDistributedLock(Jedis jedis, String lockKey, String requestId, intexpireTime) {
        try (Jedis jedis = jedisPool.getResource()){
                        String result = jedis.set(namespace +lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
              if (LOCK_SUCCESS.equals(result)) {
                  return true;
              }
              return false;
        }
}
private staticfinal Long RELEASE_SUCCESS = 1L;
    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static booleanreleaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
        String script ="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
        try (Jedis jedis =jedisPool.getResource()) {
              Object result = jedis.eval(script,Collections.singletonList(namespace + lockKey),Collections.singletonList(requestId));
              if (RELEASE_SUCCESS.equals(result)) {
                  return true;
              }
              return false;
        }
    }
}

2.基于Redisson的分布式锁

Redisson(https://redisson.org/)是Redis官网推荐的Java语言实现分布式锁的项目,支持各种部署架构的redis,包括单机、集群(Cluster)、哨兵(Sentinel)以及主从(Master/Slave),广泛被各大厂使用。当然Redisson的功能远不止分布式锁,还包括其他一些分布式结构,但是本节主要讨论其分布式锁实现。Redisson实现了JDK中的Lock接口,用法和JDK的锁很类似,只不过Redssion的锁是分布式实现。其伪代码如下:

代码语言:javascript
复制
RLock lock = redisson.getLock("MyLockName");
lock.lock();
try {
    // do sth.
} finally {
    lock.unlock();
}

首先引入Redisson库依赖:

代码语言:javascript
复制
<dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>2.2.3</version>
</dependency>

连接管理器实现如下:

代码语言:javascript
复制
// Redission连接管理器
public class RedissonManager {
    private static finalString RAtomicName = "genId_";
    private static Configconfig = new Config();
    private static Redissonredisson = null;
    public static void init(){
        try {
           config.useClusterServers() //这是用的集群server
                   .setScanInterval(2000) //设置集群状态扫描时间
                   .setMasterConnectionPoolSize(10000) //设置连接数
                    .setSlaveConnectionPoolSize(10000)
                   .addNodeAddress("127.0.0.1:6379","127.0.0.1:6380");
            redisson =Redisson.create(config);
        }catch (Exception e){
           e.printStackTrace();
        }
    }
    public static RedissongetRedisson(){
        return redisson;
    }
}

基于Redisson的分布式锁工具类实现:

代码语言:javascript
复制
public class DistributedRedisLock {
    private static Redissonredisson = RedissonManager.getRedisson();
    private static finalString LOCK_TITLE = "redisLock_";
    public static voidacquire(String lockName){
        String key =LOCK_TITLE + lockName;
        RLock mylock =redisson.getLock(key);
        mylock.lock(2,TimeUnit.MINUTES); //lock提供带timeout参数,timeout结束强制解锁,防止死锁
       System.err.println("======lock======"+Thread.currentThread().getName());
    }
    public static voidrelease(String lockName){
        String key =LOCK_TITLE + lockName;
        RLock mylock =redisson.getLock(key);
        mylock.unlock();
       System.err.println("======unlock======"+Thread.currentThread().getName());
    }
// 测试
private static void redisLock(){
       RedissonManager.init(); //初始化
        for (int i = 0; i <100; i++) {
            Thread t = newThread(new Runnable() {
                @Override
                public voidrun() {
                    try {
                        Stringkey = "test123";
                       DistributedRedisLock.acquire(key);
                       Thread.sleep(1000); //获得锁之后可以进行相应的处理
                        System.err.println("======获得锁后进行相应的操作======");
                       DistributedRedisLock.release(key);
                       System.err.println("=============================");
                    } catch(Exception e) {
                       e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }
}

Zookeeper分布式锁

有序性是Zookeeper中非常重要的一个特性,所有的更新都是全局有序,每个更新都有唯一的时间戳,该时间戳称为zxid(Zookeeper Transaction Id)。读请求只会相对于更新有序,也就是读请求的返回结果中会带有这个最新的zxid。在正式讨论如何使用Zookeeper实现分布式锁前,我们需要先了解下Zookeeper中关于节点的几个特性:

l 有序节点:假如当前有一个父节点为/hs/lock,我们可以在这个父节点下面创建子节点;Zookeeper提供了一个可选的有序特性,例如我们可以创建子节点/hs/lock/node-并且指明有序,那么Zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/hs/lock/node-0000000000,下一个节点则为/hs/lock/node-0000000001,依次类推。

l 临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,Zookeeper会自动删除该节点。

l 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,Zookeeper会通知客户端。当前Zookeeper有如下四种事件:节点创建、节点删除、节点数据修改、子节点变更。

这几个特性是Zookeeper的关键特性,也是实现分布式锁的关键,读者应确保好好领会。下面描述使用Zookeeper实现分布式锁的算法流程,假设锁空间的根节点为/hs/lock:

(1)客户端连接Zookeeper,并在/hs/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/hs/lock-0000000000,第二个为/hs/lock-0000000001,以此类推。

(2)客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁。

(3)执行业务代码。

(4)完成业务流程后,删除对应的子节点释放锁。

虽然Zookeeper原生客户端暴露的API已经比较简洁,但是友好性并不好,实现各种实际功能比较繁琐,分布式锁也不例外,所以实际中一般直接使用curator开源项目提供的Zookeeper分布式锁实现。如下所示,首先引入依赖:

代码语言:javascript
复制
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.12</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
             <!--4.0.0原生不兼容zk 3.4, 必须进行兼容性处理,否则会报KeeperErrorCode = Unimplemented异常-->
                <exclusions>
                    <exclusion>
                         <groupId>org.apache.Zookeeper</groupId>
                         <artifactId>Zookeeper</artifactId>
                    </exclusion>
                </exclusions>
      </dependency>                 

下面实现基于Curator的分布式锁工具类:

代码语言:javascript
复制
package chapter6.sec63;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorDLock {
      public static voidmain(String[] args) throws Exception {
                     // 创建Zookeeper的客户端
                     RetryPolicyretryPolicy = new ExponentialBackoffRetry(1000, 3);
                     CuratorFrameworkclient = CuratorFrameworkFactory.newClient("9.20.38.223:2181",retryPolicy);
                     client.start();
                     // 创建分布式锁, 锁空间的根节点路径为/hs/lock
                     InterProcessMutexmutex = new InterProcessMutex(client, "/hs/lock");
                     mutex.acquire();
                     // 获得了锁, 进行业务流程
                     System.out.println("Entermutex");
                     // 完成业务流程, 释放锁
                     mutex.release();
                     // 关闭客户端
                     client.close();
      }
} 

注:本文选自北京大学出版社出版的《Oracle高性能系统架构实战大全》一书,略有改动,经出版社授权刊登于此。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 高性能服务器开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 注:本文选自北京大学出版社出版的《Oracle高性能系统架构实战大全》一书,略有改动,经出版社授权刊登于此。
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档