本文涉及内容:
1、锁的应用场景: 在单体应用中,我们会使用ReentrantLock或Synchronized来应对并发场景。 比如最常见的卖票场景,假如总共有100张票,线程A和线程B同时操作,如下图:
这时有一个共享变量100,线程A和B将100拷贝到自己的工作内存中,当线程A抢到执行权的时候,此时A工作内存中的值是100,然后售票,进行自减操作,将自己工作内存中的值变成了99。当A还没来得及将99刷回到主内存的时候,线程B进来了,此时B拿到的主内存的值还是100,然后售票,进行自减,也是99。这就出现了同一张票出售了两次的情况。所以我们会加锁加volatile保证原子性保证可见性。
2、分布式锁是什么? 上面的场景中,我们可以通过ReentrantLock或者Synchronized搞定,因为你的项目只运行在一台服务器上,只有一个JVM,所有的共享变量都加载到同一个主内存中。而分布式应用中,一个项目部署在多台服务器上,最基本的架构如下图:
比如现在server1、server2和server3读取到数据库的票数都是100,在每一个server中,我们可以用JDK的锁来保证多个用户同时访问我这台server时不会出问题。但问题是,如果client1访问到的是server1,票数是100,然后购票,还没来得及将数据库票数改为99,client2也开始访问系统购票了,client2如果访问的是server1,自然不会出问题,如果访问的是server2,这时server2读取到数据库的票数还是100,那么就出问题了,又出现了同一张票卖了两次的情况。在分布式应用中,JDK的锁机制就无法满足需求了,所以就出现了分布式锁。
3、分布式锁应该满足的条件:
4、分布式锁的实现方式:
set key value NX EX 30000
;也可以用redis的第三方库比如Redisson1、建表:
CREATE TABLE `tb_distributed_lock` (
`dl_id` INT NOT NULL auto_increment COMMENT '主键,自增',
`dl_method_name` VARCHAR (64) NOT NULL DEFAULT '' COMMENT '方法名',
`dl_device_info` VARCHAR (100) NOT NULL DEFAULT '' COMMENT 'ip+线程id',
`dl_operate_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据被操作的时间',
PRIMARY KEY (`dl_id`),
UNIQUE KEY `uq_method_name` (`dl_method_name`) USING BTREE
) ENGINE = INNODB DEFAULT charset = utf8 COMMENT = '分布式锁表';
2、思路:
当执行一个方法的时候,我们首先尝试往表中插入一条数据。如果插入成功,则占锁成功,继续往下执行,执行完删除该记录。如果插入失败,我们再以当前方法名、当前机器ip+线程id、数据被操作时间为5分钟内(5分钟表示锁失效的时间)
为条件去查询,如果有记录,表示该机器的该线程在5分钟内占有过锁了,直接往下执行最后删除记录;如果没有记录,占有锁失败。
一个用户就是一个线程,所以我们可以把机器ip和用户id组合一起当成dl_device_info
。
3、占有锁和释放锁:
INSERT INTO tb_distributed_lock (
dl_method_name,
dl_device_info
)
VALUES
('方法名', 'ip&用户id');
如果insert失败,则:
SELECT
count(*)
FROM
tb_distributed_lock
WHERE
dl_method_name = '方法名'
AND dl_device_info = 'ip&用户id'
AND dl_operate_time < SYSDATE() - 5;
DELETE
FROM
tb_distributed_lock
WHERE
dl_method_name = '方法名'
AND dl_device_info = 'ip&用户id';
4、小总结: 以上表结构可能并不是很好,只是提供了这么一个思路。下面说它的优缺点:
1、原理:
基于redis的set key value nx ex 30
,这条语句的意思就是如果key不存在就设置,并且过期时间为30s,如果key已经存在就会返回false。如果要以毫秒为单位,把ex
换成px
就好了。我们执行方法前,先将方法名当成key,执行这条语句,如果执行成功就是获取锁成功,执行失败就是获取锁失败。
2、代码实现:
/**
* key不存在时就设置,返回true,key已存在就返回false
* @param key
* @param value
* @param timeout
* @return
*/
public static boolean setIfAbsent(String key, String value, Long timeout) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
}
/**
* 获取key-value
* @param key
* @return
*/
public static String getString(String key) {
return (String) redisTemplate.opsForValue().get(key);
}
/**
* 删除key
* @param key
* @return
*/
public static boolean delKey(String key) {
return redisTemplate.delete(key);
}
public String hello() {
// 方法名当作key
String key = "hello";
String value = "hellolock";
if (RedisUtil.setIfAbsent(key, value, 60 * 2L)) {
System.out.println("成功获取到锁,开始执行业务逻辑……");
// 假如执行业务逻辑需要1分钟
try {TimeUnit.MINUTES.sleep(1L); } catch (Exception e) { e.printStackTrace();};
// 释放锁先校验value,避免释放错
if (value.equals(RedisUtil.getString(key))) {
RedisUtil.delKey(key);
System.out.println("执行完业务逻辑,释放锁成功");
}
return "success";
} else {
System.out.println("锁被别的线程占有,获取锁失败");
return "acquire lock failed";
}
}
3、小总结:
n个redis
,我们先从这n个redis中尝试获取锁(锁的过期时间为x
),并记录获取锁的消耗的总时间t
,获取锁成功数量为s
,当且仅当t < x 并且 s >= (n/2 + 1)
时,认为获取锁成功。