专栏首页Java编程技术高并发编程必备基础(下)

高并发编程必备基础(下)

十三、 AbstractQueuedSynchronizer介绍

AbstractQueuedSynchronizer提供了一个队列,大多数开发者可能从来不会直接用到AQS,AQS中刮泥这个一个单一的状态信息 state,可以通过protected的getState,setState,compareAndSetState函数进行调用。对于ReentrantLock来说,state可以用来表示该线程获可重入锁的次数,semaphore来说state用来表示当前可用信号的个数,FutuerTask用来表示任务状态(例如还没开始,运行,完成,取消)。

十四、CountDownLatch原理

14.1 一个例子

public class Test {

    private static final int ThreadNum = 10;

    public static void main(String[] args)  {

        //创建一个CountDownLatch实例,管理计数为ThreadNum
        CountDownLatch countDownLatch = new CountDownLatch(ThreadNum);

        //创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(ThreadNum);

        //添加线程到线程池
        for(int i =0;i<ThreadNum;++i){
            executor.execute(new Person(countDownLatch, i+1));
        }
        
        System.out.println("开始等待全员签到...");

        try {
            //等待所有线程执行完毕
            countDownLatch.await();
            System.out.println("签到完毕,开始吃饭");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            executor.shutdown();
        }
        
    }

    static class Person implements Runnable{

        private CountDownLatch countDownLatch;
        private int index;
        
        public Person(CountDownLatch cdl,int index){
            this.countDownLatch = cdl;
            this.index = index;
        }
        
        @Override
        public void run() {
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("person " + index +"签到");
            
            //线程执行完毕,计数器减一
            countDownLatch.countDown();
            
        }
        
    }
}

如上代码,创建一个线程池和CountDownLatch实例,每个线程通过构造函数传入CountDownLatch的实例,主线程通过await等待线程池里面线程任务全部执行完毕,子线程则执行完毕后调用countDown计数器减一,等所有子线程执行完毕后,主线程的await才会返回。

14.2 原理

先看下类图:

image.png

可知CountDownLatch内部还是使用AQS实现的。 首先通过构造函数初始化AQS的状态值

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
        Sync(int count) {
            setState(count);
        }

然后看下await方法:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程被中断则抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试看当前是否计数值为0,为0则直接返回,否者进入队列等待
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

 protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

如果tryAcquireShared返回-1则 进入doAcquireSharedInterruptibly

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
       //加入队列状态为共享节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                       //如果多个线程调用了await被放入队列则一个个返回。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //shouldParkAfterFailedAcquire会把当前节点状态变为SIGNAL类型,然后调用park方法把当先线程挂起,
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

调用await后,当前线程会被阻塞主,知道所有子线程调用了countdown方法,并在在计数为0时候调用该线程unpark方法激活线程,然后该线程重新tryAcquireShared会返回1。

然后看下 countDown方法:

委托给sync
    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

首先看下tryReleaseShared

        protected boolean tryReleaseShared(int releases) {
            //循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一更新到state
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

该函数一直返回false直到当前计数器为0时候才返回true。 返回true后会调用doReleaseShared,该函数主要作用是调用uppark方法激活调用await的线程,代码如下:

private void doReleaseShared() {
   
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //节点类型为SIGNAL,把类型在通过cas设置回去,然后调用unpark激活调用await的线程
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

激活主线程后,主线程会在调用tryAcquireShared获取锁。

十五、ReentrantLock独占锁原理

15.1 ReentrantLock结构

先上类图:

image.png

可知ReentrantLock最终还是使用AQS来实现,并且根据参数决定内部是公平还是非公平锁,默认是非公平锁

 public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

加锁代码:

public void lock() {
        sync.lock();
    }

15.2 公平锁原理

先看Lock方法: lock方法最终调用FairSync重写的tryAcquire方法

        protected final boolean tryAcquire(int acquires) {
            //获取当前线程和状态值
            final Thread current = Thread.currentThread();
            int c = getState();
           //状态为0说明该锁未被任何线程持有
            if (c == 0) {
             //为了实现公平,首先看队列里面是否有节点,有的话再看节点所属线程是不是当前线程,是的话hasQueuedPredecessors返回false,然后使用原子操作compareAndSetState保证一个线程更新状态为1,设置排他锁归属与当前线程。其他线程通过cass则返回false.
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
//状态不为0说明该锁已经被线程持有,则看是否是当前线程持有,是则重入锁次数+1.
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
          
                setState(nextc);
                return true;
            }
            return false;
        }
    }

公平性保证代码:

    public final boolean hasQueuedPredecessors() {
    
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

再看看unLock方法,最终调用了Sync的tryRelease方法:

        protected final boolean tryRelease(int releases) {
           //如果不是锁持有者调用UNlock则抛出异常。
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
           //如果当前可重入次数为0,则清空锁持有线程
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //设置可重入次数为原始值-1
            setState(c);
            return free;
        }

15.3 非公平锁原理

        final void lock() {

           //如果当前锁空闲0,则设置状态为1,并且设置当前线程为锁持有者
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);//调用重写的tryAcquire方法-》nonfairTryAcquire方法
        }
 final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//状态为0说明没有线程持有该锁
                if (compareAndSetState(0, acquires)) {//cass原子性操作,保证只有一个线程可以设置状态
                    setExclusiveOwnerThread(current);//设置锁所有者
                    return true;
                }
            }//如果当前线程是锁持有者则可重入锁计数+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

15.3 总结

可知公平与非公平都是先执行tryAcquire尝试获取锁,如果成功则直接获取锁,如果不成功则把当前线程放入队列。对于放入队列里面的第一个线程A在unpark后会进行自旋调用tryAcquire尝试获取锁,假如这时候有一个线程B执行了lock操作,那么也会调用tryAcquire方法尝试获取锁,但是线程B并不在队列里面,但是线程B有可能比线程A优先获取到锁,也就是说虽然线程A先请求的锁,但是却有可能没有B先获取锁,这是非公平锁实现。而公平锁要保证线程A要比线程B先获取锁。所以公平锁相比非公平锁在tryAcquire里面添加了hasQueuedPredecessors方法用来保证公平性。

十六、ReentrantReadWriteLock原理

image.png

如图读写锁内部维护了一个ReadLock和WriteLock,并且也提供了公平和非公平的实现,下面只介绍下非公平的读写锁实现。我们知道AQS里面只维护了一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态,一个state是无法表示写和读状态的。所以ReentrantReadWriteLock使用state的高16位表示读状态也就是读线程的个数,低16位表示写锁可重入量。

static final int SHARED_SHIFT   = 16;

共享锁(读锁)状态单位值65536 
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
共享锁线程最大个数65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

排它锁(写锁)掩码 二进制 15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 返回读锁线程数  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

16.1 WriteLock

  • lock 获取锁 对应写锁只需要分析下Sync的tryAcquire和tryRelease
protected final boolean tryAcquire(int acquires) {
        
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            //c!=0说明读锁或者写锁已经被某线程获取
            if (c != 0) {
                //w=0说明已经有线程获取了读锁或者w!=0并且当前线程不是写锁拥有者,则返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
               //说明某线程获取了写锁,判断可重入个数
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
               
               // 设置可重入数量(1)
                setState(c + acquires);
                return true;
            }

           //第一个写线程获取写锁
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
  • unlock 释放锁
        protected final boolean tryRelease(int releases) {
     // 看是否是写锁拥有者调用的unlock
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
//获取可重入值,这里没有考虑高16位,因为写锁时候读锁状态值肯定为0
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
       //如果写锁可重入值为0则释放锁,否者只是简单更新状态值。
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

16.2 ReadLock

对应读锁只需要分析下Sync的tryAcquireShared和tryReleaseShared

  • lock 获取锁
protected final int tryAcquireShared(int unused) {
   
   //获取当前状态值
    Thread current = Thread.currentThread();
    int c = getState();

    //如果写锁计数不为0说明已经有线程获取了写锁,然后看是不是当前线程获取的写锁。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;

    //获取读锁计数
    int r = sharedCount(c);
    //尝试获取锁,多个读线程只有一个会成功,不成功的进入下面fullTryAcquireShared进行重试
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
  • unlock 释放锁
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }

    //循环直到自己的读计数-1 cas更新成功
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
           
            return nextc == 0;
    }
}

十七、参考

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 高并发编程必备基础(上)

    借用Java并发编程实践中的话"编写正确的程序并不容易,而编写正常的并发程序就更难了",相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没...

    加多
  • 使用线程池时候当程序结束时候记得调用shutdown关闭线程池

    日常开发中为了便于线程的有效复用,线程池是经常会被用的工具,然而线程池使用完后如果不调用shutdown会导致线程池资源一直不会被释放。下面通过简单例子来说明该...

    加多
  • java中守护线程与用户线程

    Java线程分为两类分别为daemon线程(守护线程)和User线程(用户线程),在JVM启动时候会调用main函数,main函数所在的线程是一个用户线程,这个...

    加多
  • Android线程池的详细说明(二)

    Oceanlong
  • CountDownLatch解析

    CountDownLatch是JUC包下的一个工具类,允许一个或多个线程等待,直到其他线程中执行的一个放行操作完后,等待线程才会继续往下执行的同步辅助。

    Liusy
  • 细说线程池---高级篇

    上一篇中已经讲了线程池的原理。这一次来说说源码执行过程。建议先看看细说线程池---入门篇 细说线程池---中级篇

    用户4143945
  • Fork/Join框架原理和使用探秘 顶

    Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。也是当前...

    算法之名
  • 成为高级程序员不得不了解的并发

    到目前为止,你学到的都是顺序编程,顺序编程的概念就是某一时刻只有一个任务在执行,顺序编程固然能够解决很多问题,但是对于某种任务,如果能够并发的执行程序中重要的部...

    JAVA葵花宝典
  • Java并发编程:线程控制

    在上一篇文章中(Java并发编程:线程的基本状态)我们介绍了线程状态的 5 种基本状态以及线程的声明周期。这篇文章将深入讲解Java如何对线程进行状态控制,比如...

    陈树义
  • ThreadPoolExecutor源码分析

    ThreadPoolExecutor继承AbstractExecutorService,层级实现了ExecutorService,ExecutorService...

    冰枫

扫码关注云+社区

领取腾讯云代金券