分布式锁是为了保证分布式各系统对于资源的强占,独占。分布式锁的设计与多线程锁设计一样,都是通过一个信号量,对它进行CAS(compare and set)原子操作来实现乐观锁,或通过一个独占锁实现悲观锁,悲观锁不推荐。
乐观锁的核心是通过信号量代表资源,通过CAS的操作去标志改信号被占用。CAS成功,代表资源没有被占用,执行任务;CAS失败,代表资源被占用或处理过,不执行改资源。
失败后循环CAS的操作就叫做无锁自旋
。JUC源码中,锁的实现,就是通过safe进行无锁自旋。
分布式锁在定时任务时会被使用到。分布式服务上,每个服务都有定时任务,如何保证定时任务执行的资源只执行一次,可以用分布式锁来锁住资源实现。也可以使用hash资源定位服务来实现。
定时任务分布式锁按锁的粒度,有两种思路实现。一种是定时任务加锁,即一个定时任务,只可在一个服务上执行;另一种是定时任务的每个资源加锁,即定时任务在所有服务上都执行,但是每个资源自会在一个服务节点执行。
给定时任务加上一个信号量,定时任务执行时,CAS一下,如果信号加上去,就代表没有其他节点执行定时任务,就执行;如果CAS失败,就代表已经执行了,就不要再执行这次任务了。
下面一种通过数据库来实现 ,我们加上一个定时任务表,字段有执行时间,version字段,每个定时任务对应表中的一条记录,通过update ... where version = and update_date=
做CAS操作。分布式系统CAS操作失败,代表该定时任务已被其他节点执行,它就不用执行了。
CREATE TABLE `job_mutex` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`job_type` tinyint(3) NOT NULL,
`update_time` datetime DEFAULT NULL,
`version` int(12) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
TaskJobSchedu taskJobSchedu = taskJobScheduMapper.queryTaskJobScheduWait(jobType);
//update job_mutex set version=#{new_veriosn}, update_time=#{now} where version=#{verison} and TO_DAYS(update_time) = TO_DAYS(#{now})
//定时任务一天执行一次
if(taskJobScheduMapper.updateVersonToday(taskJobSchedu, taskJobSchedu.getVersion()+1, new Date())) {
execute();
}
当当的开源项目Elastic-job就有如此实现
如果想将分布式锁的粒度放在每个资源上,即定时任务在每个节点服务上都执行,但是它们执行的资源不会重复。这个可以通过双写来解决。执行前先通过CAS写资源到一个中间状态,执行成功结束后,将资源写到完成状态。加上一个过期设置,如中间状态15分钟后,就认为资源执行失败,回滚重新执行。
这里是一种通过redis和DB双写来实现资源的,这里通过redisTemplate的setIsAbsent来做原子操作,db中资源那张表,加个字段表示是否执行定时任务。
如果执行任务失败,它就不会写到DB,在redis中的key超时后的定时任务会再次执行这个任务。
List<Task> tasks = getNotExecuteTaskInDB();
for(Task task: tasks) {
ValueOperations<String, String> tasksRedis = redisTemplate.opsForValue();
if (tasksRedis.setIfAbsent(keyPrev+task.id, "1", aliveTime, TimeUnit.MINUTES) {
execute();
setTaskToExecuted(task);
}
}
悲观锁亦可实现,但不推荐,悲观锁使用数据库可以用select for update
来独占资源。