现在的web架构大都是分布式架构,分布式架构通常都是将应用部署在若干台机器上,使用nginx实现反向代理,负载均衡的作用,但是数据的一致性是分布式应用中比较突出的一个问题。
实际业务开发中遇到一个问题: 系统需要在凌晨执行定时任务,但是由于应用是分布式的,所以多台机器会同时执行逻辑代码,同时对数据库的数据进行修改,从而产生数据库数据的错误。
1
解决思路
查阅了众多资料,思路大致分为三种:
数据库限制访问,即数据一次只能被一个应用修改
redis设置锁,利用redis的key只能存在一个达到限制效果
zookeeper有序节点,通过zookeeper有序节点实现节点逻辑执行的先后顺序
2
方案帅选
数据库: 由于存在多个定时任务,定时任务涉及到多个业务逻辑,且数据库本身是主备部署的,实现表锁不确定性较多,排除。
redis锁: 简单方便,也是系统最终采用的方案,但是存在隐患,redis锁使用了setNX的方法,setNX的参数timeout,超过超时时间会自动释放锁,所以可能存在定时任务执行时间过长,导致节点提前释放锁。
zookeeper有序节点: 比较稳定,靠谱,但是由于部署维护成本,所以放弃了,后面如果redis无法胜任了,可能会考虑使用。
3
实现思路
redis锁
原理:
如下图所示,原理是利用键值对类型数据key值不能相同,一个节点占用一个key一直到它超时释放掉,如果节点在释放前异常退出,另一个节点会在当前时间大于超时时间后,获取占用key
核心代码实现
// 业务代码
// doTask是由spring-task创建的定时任务
public void distributedWork(){
String LOCK_KEY = "LOCK_KEY";
long lockTime;
if((lockTime = redisService.getLock(lockKey)) != null){
doTask();
redisService.unLock(lockKey, lockTime);
}
}
// redisService实现接口
// 获取锁
publicLong getLock(StringlockKey) {
// 循环获取锁
while(true) {
Long lockTimeout = currentTimeFromRedis() + LOCK_TIMEOUT +1;// 锁时间
if(redisTemplate.execute(newRedisCallback() {
@Override
publicBoolean doInRedis(RedisConnection connection)throwsDataAccessException {
byte[] value = lockTimeout.toString().getBytes();
returnconnection.setNX(lockKey.getBytes(), value);
}
})) {
// 加锁成功,设置超时时间
redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
returnlockTimeout;
}else{
// setNX已经存在key
// 表示当前线程拿到的value值
Long currentLockTimeoutStr = Long.valueOf(redisTemplate.opsForValue().get(lockKey));
// value不为空而且value值早于当前redis时间,说明锁已经失效,所有线程可以重新获取锁
if(currentLockTimeoutStr !=null&& currentLockTimeoutStr < currentTimeFromRedis()) {
// getAndSet设置新值同时返回旧值。这里再重新拿value值,是为了防止可能有其他线程设置了value值,此时无法执行下面的条件判断
Long oldLockTimeoutStr = Long.valueOf(redisTemplate.opsForValue().getAndSet(lockKey, lockTimeout.toString()));
// 获取上一个锁到期时间,并设置现在的锁到期时间
if(oldLockTimeoutStr !=null&& oldLockTimeoutStr.equals(currentLockTimeoutStr)) {
// 如果此时多个线程都到达这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
returnlockTimeout;
}
}
}
try{
// 没获取到锁,睡眠100毫秒后再获取
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 释放锁
publicvoidunLock(StringlockKey,longlockValue) {
Long currentLockTimeoutStr = Long.valueOf(redisTemplate.opsForValue().get(lockKey));
// 如果是加锁者 则删除锁 如果不是则等待自动过期 重新竞争加锁
if(currentLockTimeoutStr !=null&& currentLockTimeoutStr == lockValue) {
// 删除键
redisTemplate.delete(lockKey);
}
}
zookeeper分布式锁
原理
1.创建根节点(persistent)
2.创建每当有使用资源的请求到来时, 按序创建一个临时文件节点
3.某进程需要锁,那么按序给锁即可, 不是最小节点需要等待之前的节点释放
代码实现
// 定义对象
importorg.apache.zookeeper.*;
importorg.apache.zookeeper.data.Stat;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.Collections;
importjava.util.List;
importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.Lock;
publicclassDistributedLockimplementsLock, Watcher{
// 定义若干参数
privateZooKeeper zk =null;
// 根节点
privateStringROOT_LOCK ="/locks";
// 竞争的资源, 唯一的一个名字
privateStringlockName;
// 等待的前一个锁
privateStringWAIT_LOCK;
// 当前锁
privateStringCURRENT_LOCK;
// 计数器, 用于实现一个线程等待其他若干线程执行完毕再执行
privateCountDownLatch countDownLatch;
// 在sessionTimeout时间内,如果连接断开,zk客户端会主动和服务器建立连接
privateintsessionTimeout =30000;// 会话失效实现30s
privateList exceptionList =newArrayList();
}
// 构造方法
publicDistributedLock(Stringaddress,StringlockName){
this.lockName = lockName;
try{
// address是链接的地址, this是Watcher对象(watcher实现process)
zk =newZooKeeper(address, sessionTimeout,this);
// 参数: path, watcher
Stat stat = zk.exists(ROOT_LOCK,false);
if(stat ==null){
// 如果节点不存在, 创建根节点
// path, data,
zk.create(ROOT_LOCK,newbyte[], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
}catch (IOException e){
e.printStackTrace();
}catch (InterruptedException e){
e.printStackTrace();
}catch (KeeperException e){
e.printStackTrace();
}
}
}
// 实现Lock和Watcher需要实现的接口
// Watcher实现的接口
publicvoidprocess(WatchedEvent watchedEvent){
if(this.countDownLatch !=null){
// 计数器计数值减一,
this.countDownLatch.countDown();
}
}
// Lock方法
publicvoidlock(){
// 如果存在异常, 抛出异常
if(execeptionList.size() >){
thrownewLockException(exceptionList.get());
}
try{
if(this.tryLock()){
// 尝试获取锁, 获取成功
return;
}else{
// 获取失败, 等待获取锁
waitForLock(WAIT_LOCK, sessionTimeout);
}
}catch(InterruptedException e){
e.printStackTrace();
}catch(KeeperException e){
e.printStackTrace();
}
}
publicvoidlockInterruptibly()throwsInterruptedException{
// 中断情况
this.lock();
}
publicbooleantryLock(){
// 创建锁
try{
StringsplitStr ="_lock_";
if(lockName.contains(splitStr)){
thrownewLockException("锁名有误");
}
// 创建临时有序节点
CURRENT_LOCK = zk.create(ROOT_LOCK +"/"+ lockName + splitStr,newbyte[], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 取出所有lockName的锁, 当前锁是不是最小的, 是的话返回true, 否则返回false
// 参数: path(根路径), Watcher
List subNodes = zk.getChildren(ROOT_LOCK,false);
// 取出所有lockName的锁
List lockObjects =newArrayList();
for(Stringnode : subNodes){
String_node = node.split(splitStr)[];
if(_node.equals(lockName)){
lockObjects.add(node);
}
}
// 对列表进行排序(有序号顺序)
Collections.sort(lockObjects);
// 若当前节点是最小节点, 则获取锁成功
if(CURRENT_LOCK.equals(ROOT_LOCK +"/"+ lockObjects.get())){
returntrue;
}
// 如果当前节点不是最小节点,找到自己前一个节点
StringprevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") +1);
WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) -1);
}catch(InterruptedException e){
e.printStackTrace();
}catch(KeeperException e){
e.printStackTrace();
}
returnfalse;
}
publicbooleantryLock(longtime, TimeUnit unit)throwsInterruptedException{
// 尝试获取锁, 获取不到等待一段时间获取锁
try{
if(this.tryLock()){
returntrue;
}
returnwaitForLock(WAIT_LOCK, timeout);
}catch(Exception e){
e.printStackTrace();
}
}
publicvoidunlock(){
try{
System.out.println("释放锁"+ CURRENT_LOCK);
//path, version
zk.delete(CURRENT_LOCK, -1);
CURRENT_LOCK =null;
zk.close();
}catch(InterruptedException e){
e.printStackTrace();
}catch(KeeperException e){
e.printStackTrace();
}
}
publicCondition newCondition(){
returnnull;
}
// 其他函数
// 等待锁
privatebooleanwaitForLock(Stringprev,longwaitTime)throwsKeeperException, InterruptedException{
// path, watch
Stat stat = zk.exists(ROOT_LOCK +"/"+ prev,true);
if(stat !=null){
System.out.println(Thread.currentThread().getName() +"等待锁"+ ROOT_LOCK +"/"+ prev);
this.countDownLatch =newCountDownLatch(1);
// 计数等待, 若前一个节点消失, 则process中进行countDown, 获取锁
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch =null;
}
returntrue;
}
// 锁异常
publicclassLockExceptionextendsRuntimeException{
privatestaticfinallongserialVersionUID =1L;
publicLockException(Stringe){
super(e);
}
publicLockException(Exception e){
super(e);
}
}
业务逻辑函数
publicvoiddistributedWork(){
DistributedLock lock =null;
try{
// 网址是分布式zookeeper集群中其中一台
lock =newDistributedLock("192.168.179.129:2181","LOCK");
lock.lock();
doTask();
}finally{
if(lock !=null){
lock.unlock();
}
}
}
注:本文为物流CTO原创文章,转载请直接在文章下面留言,物流君会最快处理。
领取专属 10元无门槛券
私享最新 技术干货