前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java基于AbstractQueuedSynchronizer实现资源共享锁,限制并发线程数目

java基于AbstractQueuedSynchronizer实现资源共享锁,限制并发线程数目

作者头像
10km
发布2022-05-07 09:55:25
2540
发布2022-05-07 09:55:25
举报
文章被收录于专栏:10km的专栏10km的专栏

学过java的童鞋都知道,如果你要保证一个资源一个方法只允许互斥访问,那你可以使用synchronized关键字最简单了,它能保证,一段代码,一个方法或一个对象只能同时被一个线程使用,如果线程1在使用中的情况下,其他的线程2~N都会被阻塞,直到线程1执行完synchronized块结束释放该资源。 关于synchronized的用法不这是本文的重点,就不展开说了。 如果要实现同样的互斥功能,还有一个更好的办法,就是用ReentrantLock(重入锁),它是一种递归无阻塞的同步机制,关于它与synchronized相比的好处和差别,参考下面的两个blog,讲得很透彻:

再谈重入锁–ReentrantLock Java中的ReentrantLock和synchronized两种锁定机制的对比

synchronized和ReentrantLock都是实现互斥访问的。也就是说,同一时刻,一个资源只能被一个线程使用,如果要实现n(n>2)个资源的共享访问,synchronized和ReentrantLock都是不能使用的,该怎么办?

什么情况下会有这样的需求呢?就举一个我项目中的例子吧:

我们这个项目是一个基于云计算的人脸识别系统,客户端发送照片到服务器,由服务器进行处理图像数据,调用核心的人脸检测和建模算法完成人脸检测和人脸特征码提取(也称人脸建模)。这个人脸检测和人脸建模是基于图像处理技术的非常复杂的数学运算,需要消耗大量的内存和CPU资源。(一副高清jpeg彩色图像解码到内存就需要十几MB乃至几十MB的内存) 那么问题来了,如果服务器端不限制同时执行人脸检测建模方法的线程数,当同时一间大量客户端向服务器提交建模请求的时间,服务器就会因为瞬间内存不足而崩溃。 所以即使服务器端的CPU和内存资源再丰富,也要对同时执行人脸检测/建模的线程数进行限制。

从这个需求可以看出,synchronized和ReentrantLock都不适合这个应用场景,我们需要的是能控制一定数目的线程共享访问一个资源,而不是独占式访问。 java.util.concurrent.locks下的其他锁(ReadWriteLock,ReentrantReadWriteLock)也都不适合。我当时还纳闷儿,为啥对于这么普遍的需求,java没有提供直接的类呢? 然后进一步在网上查资料,我看到了AbstractQueuedSynchronizer,这个类是前面所说的ReentrantLock,ReadWriteLock,ReentrantReadWriteLock等同步锁的实现基础。

关于AbstractQueuedSynchronizer的介绍,网上可以找到很多详细的说明,比如 AbstractQueuedSynchronizer的介绍和原理分析

通过上面文章对AbstractQueuedSynchronizer的介绍,可以知道,AbstractQueuedSynchronizer本身不仅提供了互斥访问机制,同时也提供了共享访问机制。见下表中列出的AbstractQueuedSynchronizer的部分方法:

方法名称

描述

protected boolean tryAcquire(int arg)

排它的获取这个状态。这个方法的实现需要查询当前状态是否允许获取,然后再进行获取(使用compareAndSetState来做)状态。

protected boolean tryRelease(int arg)

释放状态

protected int tryAcquireShared(int arg)

共享的模式下获取状态。

protected boolean tryReleaseShared(int arg)

共享的模式下释放状态

protected boolean isHeldExclusively()

在排它模式下,状态是否被占用。

那么利用AbstractQueuedSynchronizer提供的共享访问机制,再参照ReentrantLock的实现方式,我们不用写很多代码就可以定制一个满足实际需求的资源共享锁:

代码语言:javascript
复制
/**   
 * @Title: ShareLock.java 
 * @Description: TODO 
 * @author guyadong   
 * @date 2015年6月10日 上午9:00:50 
 * @version V1.0   
 */

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 共享锁<br>
 * 实现固定数目 {@link #maxShareCount} 的资源共享锁,限制并发线程数目.<br>
 * 同一个线程内嵌套加锁解锁,不会重复计数
 * @see {@link Lock }
 * @see {@link AbstractQueuedSynchronizer }
 * @author guyadong
 *
 */
public class ShareLock implements Lock {
//从 AbstractQueuedSynchronizer 继承的Sync 这里是重点
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -3340303865224708218L;
        /**
         * 线程加锁计数
         */
        private final ThreadLocal<Integer> threadLockCount=new ThreadLocal<Integer>();
        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("maxShareCount must large than zero.");
            }
            setState(count);
        }
        @Override
        //实现共享机制的请求方法
        public int tryAcquireShared(int acquireCount) {
            Integer tlc=threadLockCount.get();
            if (null == tlc) {
                for (;;) {
                    int current = getState();
                    int newCount = current - acquireCount;
                    if (newCount < 0) {
                        return newCount;
                    }else if (compareAndSetState(current, newCount)) {
                        threadLockCount.set(acquireCount);
                        return newCount;
                    }
                }
            }else{//同一线程内重复加锁时,直接将计数器累加
                tlc+=acquireCount;
                return getState();
            }
        }
        @Override
        //实现共享机制的释放方法
        public boolean tryReleaseShared(int releaseCount) {
            Integer tlc = threadLockCount.get();
            if(null == tlc || tlc <= 0)
                throw new IllegalStateException("Error threadLockCount");
            if ((tlc -= releaseCount) > 0) {
                return true;
            } else {
                if(tlc!=0)
                    throw new IllegalStateException("Error threadLockCount");
                for (;;) {
                    int current = getState();
                    int newCount = current + releaseCount;
                    if (compareAndSetState(current, newCount)) {
                        threadLockCount.set(null);
                        return true;
                    }
                }
            }
        }
    }
    /**
     * 可用资源计数
     */
    private final int maxShareCount;
    /**
     * 同步对象
     */
    private final Sync sync;
    /**
     * @param maxShareCount 最大可用资源计数
     */
    public ShareLock(int maxConcurrent) {
        this.maxShareCount = maxConcurrent;
        this.sync = new Sync(this.maxShareCount);
    }
    @Override
    public void lock() {
        sync.acquireShared(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    @Override
    public Condition newCondition() {
        //不支持该方法
        throw new UnsupportedOperationException();
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }
    /**
     * @return maxShareCount
     */
    public int getMaxShareCount() {
        return maxShareCount;
    }
}

下面是ShareLock的使用方式的伪代码

代码语言:javascript
复制
//初始化共享资源计数为CPU的核心数
private final static ShareLock concurrentLock = new ShareLock(Runtime.getRuntime().availableProcessors());
getCodeInfo{
    concurrentLock.lock();
    try {
        //....do something 共享模式执行
    }finally{
    concurrentLock.unlock();
    }
}

用法很简单吧?呵呵

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2015-09-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
人脸识别
腾讯云神图·人脸识别(Face Recognition)基于腾讯优图强大的面部分析技术,提供包括人脸检测与分析、比对、搜索、验证、五官定位、活体检测等多种功能,为开发者和企业提供高性能高可用的人脸识别服务。 可应用于在线娱乐、在线身份认证等多种应用场景,充分满足各行业客户的人脸属性识别及用户身份确认等需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档