分布式应用中定时任务冲突的解决思路

现在的web架构大都是分布式架构,分布式架构通常都是将应用部署在若干台机器上,使用nginx实现反向代理,负载均衡的作用,但是数据的一致性是分布式应用中比较突出的一个问题。

实际业务开发中遇到一个问题: 系统需要在凌晨执行定时任务,但是由于应用是分布式的,所以多台机器会同时执行逻辑代码,同时对数据库的数据进行修改,从而产生数据库数据的错误。

1

解决思路

查阅了众多资料,思路大致分为三种:

数据库限制访问,即数据一次只能被一个应用修改

redis设置锁,利用redis的key只能存在一个达到限制效果

zookeeper有序节点,通过zookeeper有序节点实现节点逻辑执行的先后顺序

2

方案帅选

数据库: 由于存在多个定时任务,定时任务涉及到多个业务逻辑,且数据库本身是主备部署的,实现表锁不确定性较多,排除。

redis锁: 简单方便,也是系统最终采用的方案,但是存在隐患,redis锁使用了setNX的方法,setNX的参数timeout,超过超时时间会自动释放锁,所以可能存在定时任务执行时间过长,导致节点提前释放锁。

zookeeper有序节点: 比较稳定,靠谱,但是由于部署维护成本,所以放弃了,后面如果redis无法胜任了,可能会考虑使用。

3

实现思路

redis锁

原理:

如下图所示,原理是利用键值对类型数据key值不能相同,一个节点占用一个key一直到它超时释放掉,如果节点在释放前异常退出,另一个节点会在当前时间大于超时时间后,获取占用key

核心代码实现

// 业务代码

// doTask是由spring-task创建的定时任务

public void distributedWork(){

String LOCK_KEY = "LOCK_KEY";

long lockTime;

if((lockTime = redisService.getLock(lockKey)) != null){

doTask();

redisService.unLock(lockKey, lockTime);

}

}

// redisService实现接口

// 获取锁

publicLong getLock(StringlockKey) {

// 循环获取锁

while(true) {

Long lockTimeout = currentTimeFromRedis() + LOCK_TIMEOUT +1;// 锁时间

if(redisTemplate.execute(newRedisCallback() {

@Override

publicBoolean doInRedis(RedisConnection connection)throwsDataAccessException {

byte[] value = lockTimeout.toString().getBytes();

returnconnection.setNX(lockKey.getBytes(), value);

}

})) {

// 加锁成功,设置超时时间

redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);

returnlockTimeout;

}else{

// setNX已经存在key

// 表示当前线程拿到的value值

Long currentLockTimeoutStr = Long.valueOf(redisTemplate.opsForValue().get(lockKey));

// value不为空而且value值早于当前redis时间,说明锁已经失效,所有线程可以重新获取锁

if(currentLockTimeoutStr !=null&& currentLockTimeoutStr < currentTimeFromRedis()) {

// getAndSet设置新值同时返回旧值。这里再重新拿value值,是为了防止可能有其他线程设置了value值,此时无法执行下面的条件判断

Long oldLockTimeoutStr = Long.valueOf(redisTemplate.opsForValue().getAndSet(lockKey, lockTimeout.toString()));

// 获取上一个锁到期时间,并设置现在的锁到期时间

if(oldLockTimeoutStr !=null&& oldLockTimeoutStr.equals(currentLockTimeoutStr)) {

// 如果此时多个线程都到达这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁

redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);

returnlockTimeout;

}

}

}

try{

// 没获取到锁,睡眠100毫秒后再获取

TimeUnit.MILLISECONDS.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

// 释放锁

publicvoidunLock(StringlockKey,longlockValue) {

Long currentLockTimeoutStr = Long.valueOf(redisTemplate.opsForValue().get(lockKey));

// 如果是加锁者 则删除锁 如果不是则等待自动过期 重新竞争加锁

if(currentLockTimeoutStr !=null&& currentLockTimeoutStr == lockValue) {

// 删除键

redisTemplate.delete(lockKey);

}

}

zookeeper分布式锁

原理

1.创建根节点(persistent)

2.创建每当有使用资源的请求到来时, 按序创建一个临时文件节点

3.某进程需要锁,那么按序给锁即可, 不是最小节点需要等待之前的节点释放

代码实现

// 定义对象

importorg.apache.zookeeper.*;

importorg.apache.zookeeper.data.Stat;

importjava.io.IOException;

importjava.util.ArrayList;

importjava.util.Collections;

importjava.util.List;

importjava.util.concurrent.CountDownLatch;

importjava.util.concurrent.TimeUnit;

importjava.util.concurrent.locks.Condition;

importjava.util.concurrent.locks.Lock;

publicclassDistributedLockimplementsLock, Watcher{

// 定义若干参数

privateZooKeeper zk =null;

// 根节点

privateStringROOT_LOCK ="/locks";

// 竞争的资源, 唯一的一个名字

privateStringlockName;

// 等待的前一个锁

privateStringWAIT_LOCK;

// 当前锁

privateStringCURRENT_LOCK;

// 计数器, 用于实现一个线程等待其他若干线程执行完毕再执行

privateCountDownLatch countDownLatch;

// 在sessionTimeout时间内,如果连接断开,zk客户端会主动和服务器建立连接

privateintsessionTimeout =30000;// 会话失效实现30s

privateList exceptionList =newArrayList();

}

// 构造方法

publicDistributedLock(Stringaddress,StringlockName){

this.lockName = lockName;

try{

// address是链接的地址, this是Watcher对象(watcher实现process)

zk =newZooKeeper(address, sessionTimeout,this);

// 参数: path, watcher

Stat stat = zk.exists(ROOT_LOCK,false);

if(stat ==null){

// 如果节点不存在, 创建根节点

// path, data,

zk.create(ROOT_LOCK,newbyte[], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)

}catch (IOException e){

e.printStackTrace();

}catch (InterruptedException e){

e.printStackTrace();

}catch (KeeperException e){

e.printStackTrace();

}

}

}

// 实现Lock和Watcher需要实现的接口

// Watcher实现的接口

publicvoidprocess(WatchedEvent watchedEvent){

if(this.countDownLatch !=null){

// 计数器计数值减一,

this.countDownLatch.countDown();

}

}

// Lock方法

publicvoidlock(){

// 如果存在异常, 抛出异常

if(execeptionList.size() >){

thrownewLockException(exceptionList.get());

}

try{

if(this.tryLock()){

// 尝试获取锁, 获取成功

return;

}else{

// 获取失败, 等待获取锁

waitForLock(WAIT_LOCK, sessionTimeout);

}

}catch(InterruptedException e){

e.printStackTrace();

}catch(KeeperException e){

e.printStackTrace();

}

}

publicvoidlockInterruptibly()throwsInterruptedException{

// 中断情况

this.lock();

}

publicbooleantryLock(){

// 创建锁

try{

StringsplitStr ="_lock_";

if(lockName.contains(splitStr)){

thrownewLockException("锁名有误");

}

// 创建临时有序节点

CURRENT_LOCK = zk.create(ROOT_LOCK +"/"+ lockName + splitStr,newbyte[], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 取出所有lockName的锁, 当前锁是不是最小的, 是的话返回true, 否则返回false

// 参数: path(根路径), Watcher

List subNodes = zk.getChildren(ROOT_LOCK,false);

// 取出所有lockName的锁

List lockObjects =newArrayList();

for(Stringnode : subNodes){

String_node = node.split(splitStr)[];

if(_node.equals(lockName)){

lockObjects.add(node);

}

}

// 对列表进行排序(有序号顺序)

Collections.sort(lockObjects);

// 若当前节点是最小节点, 则获取锁成功

if(CURRENT_LOCK.equals(ROOT_LOCK +"/"+ lockObjects.get())){

returntrue;

}

// 如果当前节点不是最小节点,找到自己前一个节点

StringprevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") +1);

WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) -1);

}catch(InterruptedException e){

e.printStackTrace();

}catch(KeeperException e){

e.printStackTrace();

}

returnfalse;

}

publicbooleantryLock(longtime, TimeUnit unit)throwsInterruptedException{

// 尝试获取锁, 获取不到等待一段时间获取锁

try{

if(this.tryLock()){

returntrue;

}

returnwaitForLock(WAIT_LOCK, timeout);

}catch(Exception e){

e.printStackTrace();

}

}

publicvoidunlock(){

try{

System.out.println("释放锁"+ CURRENT_LOCK);

//path, version

zk.delete(CURRENT_LOCK, -1);

CURRENT_LOCK =null;

zk.close();

}catch(InterruptedException e){

e.printStackTrace();

}catch(KeeperException e){

e.printStackTrace();

}

}

publicCondition newCondition(){

returnnull;

}

// 其他函数

// 等待锁

privatebooleanwaitForLock(Stringprev,longwaitTime)throwsKeeperException, InterruptedException{

// path, watch

Stat stat = zk.exists(ROOT_LOCK +"/"+ prev,true);

if(stat !=null){

System.out.println(Thread.currentThread().getName() +"等待锁"+ ROOT_LOCK +"/"+ prev);

this.countDownLatch =newCountDownLatch(1);

// 计数等待, 若前一个节点消失, 则process中进行countDown, 获取锁

this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);

this.countDownLatch =null;

}

returntrue;

}

// 锁异常

publicclassLockExceptionextendsRuntimeException{

privatestaticfinallongserialVersionUID =1L;

publicLockException(Stringe){

super(e);

}

publicLockException(Exception e){

super(e);

}

}

业务逻辑函数

publicvoiddistributedWork(){

DistributedLock lock =null;

try{

// 网址是分布式zookeeper集群中其中一台

lock =newDistributedLock("192.168.179.129:2181","LOCK");

lock.lock();

doTask();

}finally{

if(lock !=null){

lock.unlock();

}

}

}

注:本文为物流CTO原创文章,转载请直接在文章下面留言,物流君会最快处理。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180202G1EQY500?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券