分布式改造剧集2---DIY分布式锁

前言:

​ 好了,终于又开始播放分布式改造剧集了。前面一集中(http://www.cnblogs.com/Kidezyq/p/8748961.html)我们DIY了一个Hessian转发实现,最后我们也留下了一个展望方向:可以实现一个管理界面管理节点,实现简单的服务治理的功能。这一集我们接着继续DIY分布式锁。

第二集:分布式锁DIY

探索之路

​ 由于业务互斥的需要,当前项目中实现了一个内存锁。锁的大致模型是分为锁类型和锁键值,只有当锁类型和键值都相同的时候,整个业务才互斥。但是必须提供一个方法,来判断某种类型的锁是否存在。大致代码如下:

/**
 * 内存锁
 *
 */
public class MemoryLock {
    /**
     * 同步锁
     */
    private final Object lock = new Object();
    
    /**
     * 内存锁模型
     */
    private ConcurrentHashMap<String, ConcurrentHashMap<String, String>> lockMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, String>>();

    /**
     * 尝试获取到锁
     * @param lockType 锁类型
     * @param key       锁键值
     * @return 如果当前获取到锁,则返回true。否则,返回false。
     */
    private boolean tryLock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentHashMap<String, String> map = this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, String>();
                this.lockMap.put(lockType, map);
            }
            return (map.putIfAbsent(key, key) == null);
        }
    }
    
    /**
     * 判断某种类型的锁是不是空的
     * @param lockType  锁类型
     * @return true,不存在某种类型的锁;false,存在某种类型的锁。
     */
    public boolean isLockTypeEmpty(String lockType) {
        if (null != this.lockMap.get(lockType)) {
            return this.lockMap.get(lockType).size() == 0;
        }
        return true;
    }

    /**
     * 获取锁
     * @param lockType  锁类型
     * @param key       锁键值
     * @param timeout   超时时间(毫秒)
     * @throws TimeoutException 如果超时之后还没有获得到锁,则抛出超时异常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否没有超时设置,当传入的超时时间为负数或者为0时,表示没有超时时间
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));
        
        throw new TimeoutException();
    }

    /**
     * 释放锁
     * @param lockType 锁类型
     * @param key      锁键值
     */
    public void unlock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentMap<String, String> map = this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
    }
}

​ 可以看到,单机模式下的互斥锁是直接在内存中保存一个ConcurrentHashMap,然后利用putIfAbsent的原子特性。该锁的使用方式如下:

try {
    memoryLock.lock(lockType, lockKey, 0l);
} catch(TimeOutException e) {
    // TODO: Exception caught  
} finally {
    memoryLock.unlock(lockType, lockKey);
}

​ 当应用部署在分布式环境中的时候。显然,原来的内存锁已经不适用。那么在分布式情况下,如何实现锁服务呢?网上给出的分布式锁的实现方案一般有三种:

  1. 利用数据库的for update行锁
  2. 利用Redis的setnx
  3. 利用zookeeper的分布式一致性算法

​ 考虑到尽量不增加新的应用部署,那么先排除2、3,只剩下数据库的行级锁。但其实数据库的行级锁在并发量特别大的时候会对数据库性能造成较大影响,而且估计我想使用DBA都不会允许.....

​ 那么,有没有什么其他更好的办法呢?这次我们利用曲线救国的方式来实现,将分布式转变成非分布式。


实现Demo

​ 在分布式改造剧集第一集中,我们的实现方式中有一个主节点,主节点为配置文件中默认配置的Hessian服务的地址。只有加上了Distribute注解的服务,才会在客户端进行Hessian调用的时候进行路由,否则最终调用的Hessian服务地址即为配置文件中配置的主节点。依赖于这个特性,我们可以不给锁服务添加Distribute注解,使得所有分机部署的服务请求都落到主节点上。具体实现步骤如下:

定义一个内存锁Hessian服务

​ 其实简单来说我们直接将原来的MemoryLock发布成Hessian服务,并且不使用Distribute注解就可以实现将分布式锁转换成单机锁。但是还有以下两点需要特殊考虑:

  1. 分布式服务的多机特性: 内存锁的释放必须显示释放,如果一个服务调用unlock方法之前就挂掉,就可能导致某一个锁永远被锁住。所以我们还需要一个类似于Redis分布式锁实现中的锁超时移除机制。
  2. 远程RPC调用的可能超时: 最终锁的服务调用是需要通过Hessian来实现的,考虑到Hessian调用存在超时时间,如果将前面MemoryLocklock方法等待实现在Hessian服务中,那么等待时间超长的话会直接导致Hessian服务调用超时。所以改造后的MemoryLock不实现lock方法,只实现tryLock方法,调用该方法时立即返回当前是否可以获得到锁。
  3. 本地服务实现锁等待以及减少Hessian调用: 如第2点所说,我们的锁等待特性不能在内存锁的Hessian服务中实现,只能通过本地服务中实现。另外频繁的Hessian调用会影响应用程序的性能,也需要一个本地的锁服务来巧妙地减少远程服务调用

​ 改造后的MemoryLock代码如下:

@Service("moemoryLockServiceFacade")
public class MemoryLockServiceImpl implements MemoryLockService {
    
    /**
     * 自动超时时间:当前设置为10分钟 单位为纳秒
     */
    private final static long AUTO_EXPIRE_TIME = 1000000000l * 60 * 10;
    
    /**
     * 锁
     */
    private Object semaphore = new Object();

    /**
     * 内存锁结构,双层Map 首层Map的Key存锁类型,value为内层Map。内层Map额key为锁键值,value为锁的加入时间
     */
    private ConcurrentMap<String, ConcurrentMap<Object, Long>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<Object, Long>>();
    
    /**
     * 守护线程: 用来清理过期内存缓存(如果加锁的客户端由于各种原因没有显示解锁,则可能出现其他服务无法获取锁的情况)
     */
    private Thread daemonThread;
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLockServiceImpl.class);
    
    /**
     * 是否终止守护线程的标识
     */
    private volatile boolean stop = false;
    
    /**
     * 清理失效锁的线程
     *
     */
    private class ClearExpireLockThread extends Thread {
        
        @Override
        public void run() {
            Iterator<Entry<String, ConcurrentMap<Object, Long>>> outerIterator = null;
            Iterator<Entry<Object, Long>> innerIterator = null;
            
            // 清理超过超时时间的锁
            while (!stop) {
                synchronized (semaphore) {
                    long expireNanoTimes = System.nanoTime() - AUTO_EXPIRE_TIME;    // 算出超时时间,小于该时间的缓存都应该被移除
                    outerIterator = lockMap.entrySet().iterator();
                    while (outerIterator.hasNext()) {
                        Entry<String, ConcurrentMap<Object, Long>> entrySet = outerIterator.next();
                        innerIterator = entrySet.getValue().entrySet().iterator();
                        boolean allDeleted = true;  // 是否全部删除的标识,默认设为true
                        while (innerIterator.hasNext()) {
                            Entry<Object, Long> innerEntry = innerIterator.next();
                            if (expireNanoTimes > innerEntry.getValue()) {
                                innerIterator.remove();
                                LOGGER.info("守护线程移除类型为【{}】键值为【{}】的锁......", entrySet.getKey(), innerEntry.getKey());
                            } else {
                                allDeleted = false;
                            }
                        }
                        
                        // 如果所类型下的所有锁都被清除,则锁类型也该被移除
                        if (allDeleted) {
                            outerIterator.remove();
                            LOGGER.info("守护线程移除类型为【{}】的锁......", entrySet.getKey());
                        }
                    }
                }
                
                try {
                    // 如果超时时间为1秒,则等待千分之一秒
                    Thread.sleep(AUTO_EXPIRE_TIME / 1000000000l);
                } catch (InterruptedException e) {
                }
            }
        }
    }
    
    /**
     * 终止守护线程
     */
    @PreDestroy
    public void stopDeamonThread() {
        this.stop = true;
        this.daemonThread.interrupt();
    }
    
    /**
     * 初始化守护线程,用来扫描移除超时的内存锁
     */
    @PostConstruct
    public void initDeamonThread() {
        daemonThread = new ClearExpireLockThread();
        daemonThread.setDaemon(true);
        daemonThread.start();
    }

    @Override
    public boolean tryLock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<Object, Long>();
                this.lockMap.put(lockType, map);
            }
            
            // 这里的value值设置为加锁的初始时间
            return (map.putIfAbsent(key, System.nanoTime()) == null);
        }
    }

    @Override
    public boolean isLockTypeEmpty(String lockType){
        return MapUtils.isEmpty(this.lockMap.get(lockType));
    }
    
    @Override
    public void unlock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map != null) {
                map.remove(key);
                LOGGER.info("手工释放类型为【{}】键值为【{}】的锁......", lockType, key);
            }
        }
    }
}

定义一个分布式锁管理服务实现

​ 定义一个DistributeLock服务,该服务作为本地服务,用来实现锁等待以及减少Hessian锁请求调用。在本地锁服务中注入原来的内存锁Hessian服务实现。具体代码如下:

/**
 * 分布式锁管理类
 *
 */
@Service
public class DistributeLock {
    /**
     * 注入hessian接口的实现类
     */
    @Resource(name="moemoryLockServiceFacade")
    private MemoryLockService memoryLockService;
    
    private Object semaphore = new Object(); 

    /**
     * 内存锁结构,双层Map 首层Map的Key存锁类型,value为内层Map。内层Map额key为锁键值,value为锁住的尝试远程hessian调用获取锁的线程
     */
    private ConcurrentMap<String, ConcurrentMap<String, Thread>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<String, Thread>>();

    /**
     * 判断是否能够获得锁,不阻塞立即返回
     * @param lockType 锁类型
     * @param key  锁的键值
     * @return  true,能够获得锁.false,不能获得锁.
     */
    private boolean tryLock(String lockType, String key) {
        // 提升效率,先内部map判断是否存在锁,如果存在,则直接等待
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, Thread>();
                this.lockMap.put(lockType, map);
            }
            Thread t = map.putIfAbsent(key, Thread.currentThread());
            
             // 单个服务只有首先获得本机内存锁的线程才有机会去远程调用hessian服务判断是否有锁
            if (t != null && Thread.currentThread() != t) {
                return false;
            }
        }
        
        return memoryLockService.tryLock(lockType, key);
    }
    
    /**
     * 获得锁,在获得锁之前阻塞
     * @param lockType  锁类型
     * @param key   锁键值
     * @param timeout 超时时间
     * @throws TimeoutException 超时抛出超时异常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否没有超时设置,当传入的超时时间为负数或者为0时,表示没有超时时间
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));
        
        throw new TimeoutException();
    }

    /**
     * 是否指定的锁类型,当前锁的数量为空
     * @param lockType 锁类型
     * @return true,当前锁类型的锁的数量为空;false,当前锁类型的锁锁的数量不为空
     */
    public boolean isLockTypeEmpty(String lockType){
        // 直接内部判断
        if (MapUtils.isNotEmpty(lockMap.get(lockType))) {
            return false;
        }
        
        // 内部判断成功还需远程调用判断
        return memoryLockService.isLockTypeEmpty(lockType);
    }
    
    /**
     * 释放锁
     * @param lockType 锁类型
     * @param key       锁的键值
     */
    public void unlock(String lockType, String key) {
        // 移除本机内存锁模型
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
        
        // 远程调用释放锁
        memoryLockService.unlock(lockType, key);
    }
}

​ 好了,分布式锁的Demo顺利完成。使用的时候只要将原来的MemoryLock替换成DistributeLock即可。


展望

​ 分布式锁的实现就到这里,其实现的本质在于将分布式转变成非分布式。这里也可以说我是钻了"分布式"的空子?

​ 那么既然分布式锁的最终实现也是通过内存锁实现的,且利用了主节点的特性。那么其实我们在实现分布式锁之后,还有下面两个方向可以优化:

  1. 锁管理: 可以增加一个锁管理页面,来展示当前内存中存在的锁,以及移除需要马上移除的锁
  2. 主节点替换: 当前的分布式锁的实现还是依赖于主节点。考虑到主节点可能也挂掉,需要增加主节点可以动态切换的功能。严格上来讲这个是分布式改造剧集1应该实现的功能

后续

​ 好了,分布式锁的改造暂且到此。可以看到其实分布式其实并没有我们想象的这么复杂,分布式技术也没有特别地遥不可及。面对不断革新的技术,我们应该除了拿来主义之外,多思考,真正了解技术背后的实现原理。就像我一直认为的:相比于用轮子造轮子的能力要重要的多

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java 源码分析

NioEventLoopGroup 源码分析

NioEventLoopGroup 源码分析 1. 在阅读源码时做了一定的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限。为了方便 IDE 查看...

3266
来自专栏Java架构沉思录

优雅实现延时任务之zookeeper篇

在《优雅实现延时任务之Redis篇》一文中提到,实现延时任务的关键点,是要存储任务的描述和任务的执行时间,还要能根据任务执行时间进行排序,那么我们可不可以使用z...

1013
来自专栏Java架构沉思录

分布式ID常见解决方案

在分布式系统中,往往需要对大量的数据如订单、账户进行标识,以一个有意义的有序的序列号来作为全局唯一的ID。

1942
来自专栏情情说

深入浅出MyBatis:MyBatis与Spring集成及实用场景

本篇是「深入浅出MyBatis」系列的最后一篇,主要介绍与Spring的集成,以及工作中的一些实用场景。

3889
来自专栏PHP技术

设计模式之—单例模式(Singleton)的常见应用场景

单例模式(Singleton)也叫单态模式,是设计模式中最为简单的一种模式,甚至有些模式大师都不称其为模式,称其为一种实现技巧,因为设计模式讲究对象之间的关系的...

3386
来自专栏岑玉海

线程池

  学习java很久很久了,得有个5年了,但是从来都没有真正的走进java世界,希望从这篇文章开始,把自己对java的点点滴滴都记录下来。   从java5开始...

2764
来自专栏Java 源码分析

NioEventLoopGroup 源码分析

NioEventLoopGroup 源码分析 1. 在阅读源码时做了一定的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限。为了方便 IDE 查看...

3457
来自专栏Albert陈凯

Scala代码编写中常见的十大陷阱

很多Java开发者在学习Scala语言的时候,往往觉得Scala的语法和用法有些过于复杂,充满语法糖,太“甜”了。在使用Scala编写代码时,由于语法和编写习惯...

2355
来自专栏Python、Flask、Django

Flask 项目系列 -- 基于Flask打造招聘网站(2017-12-07更新)

1093
来自专栏一枝花算不算浪漫

[Java面试七]Mybatis总结以及在面试中的一些问题.

42114

扫码关注云+社区