Hi~朋友,不点蓝字关注,我们哪来故事?
摘要
1. 如何保证线程安全
Java多线程在对共享资源进行访问时,如果不加以控制会存在线程安全问题,当我们使用多线程对共享资源访问时,通常会线程共享资源的进行访问线程数的控制:
2. JDK并发包中的同步控制
Java提供了大量的工具让我们对共享资源进行多线程访问控制,其中很多人最熟悉的是synchronized关键字,除了该关键字,JDK的并发包中也提供了大量的类来进行同步控制,主要有:
3. JDK包中的锁的特点
3.1 ReentrantLock
ReentrantLock的作用和synchronized的作用基本一致,但是ReentrantLock应该需要手动lock(如果锁已被占用则需要等待)和unlock,并且支持线程中断,支持公平锁(按照请求锁的顺序获得锁),尝试获得锁的时候可以指定最大等待时间(如果不指定时间则不进行等待,拿到锁返回true,否则返回false)。
ReentrantLock搭配Condition的await()以及signal()相当于synchronized搭配Object.wait()以及notify()。
3.2 Semaphore
Semaphore(信号量)允许多个线程同时访问一个共享资源,在构造信号量时必须指定允许的最大线程数,在使用信号量时,我们会尝试获取一个许可,如果获取失败,则需要等待,直到有线程释放许可或者当前线程被中断。
3.3 ReadWriteLock
ReadWriteLock(读写锁)是读写分离的锁,读与读之间不需要阻塞互斥,但读写、写写之间需要阻塞互斥。ReadWriteLock中可以获取一个读锁和一个写锁,通过读写锁可以提高读多写少的程序的并发运行效率。
3.4 CountDownLatch
CountDownLatch是一个倒计数器,在构造CountDownLatch时我们需要传一个数字,通过调用CountDownLatch的await方法,我们可以阻塞当前线程,直至倒计数器减到0,通过CountDownLatch的countdown()方法,我们可以将倒计数器的值减1。
CountDownLatch的作用就好比乘坐飞机,我们必须等所有的旅客检票完成以后,然后飞机才能起飞。
3.4 CyclicBarrier
CyclicBarrier(循环栅栏),循环栅栏的作用类似CountDownLatch,但是他会在将计数器减到0以后重新恢复原始计数器的值,循环的概念也就是这么来的,而且循环栅栏支持在计数器减到0以后触发一个动作(Runnable接口的实现类)。
3.5 LockSupport
LockSupport是一个线程阻塞工具,机制类似于信号量,通过park()可以消费一个许可,如果当前没有许可,线程会被阻塞,unpark()释放一个许可。LockSupport中的许可最多只有一个。
LockSupport不会造成死锁,假设我们的线程先执行了unpark,然后再调用park也不会阻塞(因为unpark已经释放了一个许可可被获得使用)。
Thread.resume()假设先于Thread.suspend()方法先执行,则会导致线程死锁。
4. 为什么要讲AQS
通过上图我们可以看出,前文我们所提到的锁在其内部都有一个Sync类(采用了组合的方式),而这个Sync类都继承自AbstractQueuedSynchronizer类。
4.1 AQS核心数据结构
在AQS中有两个内部类:
4.2 Node
Node的数据结构如下:
除了上述4个属性外,还有一个重要的属性就是:
该属性表示的是节点在队列中的状态,括号中的为状态的int值:
4.3 ConditionObject
由于我们这篇文章我们只专注分析普通的锁,不涉及条件变量等待,所以读者朋友们可自行阅读源码。
5. 排他锁申请
下面这张图是ReentrantLock使用排他锁的流程图:
下面我们分析一下排他锁的使用过程,我们可以以ReentrantLock为切入点,分析一下AQS的实现:
public void lock() {
sync.lock();
}
我们在调用ReentrantLock的lock方法时,将会调用Sync的lock方法(),这里的Sync是一个抽象类,我们这里主要看非公平锁的实现方式,也就是NonfairSync的lock方法,如下:
final void lock() {
// 1
if (compareAndSetState(0, 1))
// 2
setExclusiveOwnerThread(Thread.currentThread());
else
// 3
acquire(1);
}
5.1 acquire
acquire方法是AQS中的一个方法,如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先调用tryAcquire再次尝试获取锁,该方法在AQS中的实现是抛出UnsupportedOperationException异常,也就是说子类必须要实现这个方法,下面我们看一下tryAcquire在ReentrantLockSync中的nonfairTryAcquire(int acquires)方法如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这里的主要逻辑就是先获取一下锁的状态,如果锁的状态是未占有(state值为0),则再次使用CAS尝试去占有锁,如果尝试成功,则返回true。
通过CAS尝试获取锁失败以后,判断锁是否被当前线程持有,如果是,调用setState(nextc)将AQS中的state的值+1,然后返回true(这里解决了重入的问题)。
上述两种情况如果都不成立,则返回false,表示当前线程获取锁失败。
5.2 acquireQueued方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
当我们尝试获取锁失败以后,会调用acquireQueued该线程添加至等待队列,那么这个等待队列中的Node是怎么构建出来的呢?答案就是addWaiter方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter方法中首先构建了一个Node节点,Node节点中的线程为当前线程,然后尝试将该节点添加到AQS的等待队列中,如果原来等待队列的尾节点不为NULL的会采用CAS快速失败的方法进行添加,如果成功添加,则返回这个新节点,否则调用enq添加新节点到等待队列中,方法如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 如果队列中还没有节点,CAS初始化一个空的头结点,并把尾节点设置为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS设置当前节点node为尾节点,并把原来的尾节点的next指向当前节点node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果队列中暂时还没有节点的话,就初始化一个空的头节点,并将尾节点设置为头节点,然后将当前节点node的前一个节点改成tail节点,并通过CAS直至成功将node改为tail节点,并且原先的tail节点的next修改为当前节点note,然后返回当前节点node的前置节点(也就是原来的tail节点)。
当节点添加成功以后,我们会主动调用acquireQueued尝试获取锁,方法如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 开启自旋
for (;;) {
// 获取当前节点node的前继节点
final Node p = node.predecessor();
// 如果前继节点是头结点(虚节点),那么意味着当前节点是第一个数据节点,那么久尝试获取锁,如果获取锁成功,将当前节点设置为头结点,setHead方法会将节点中的线程和prev节点都置为空
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果node的前继节点不是头结点或者获取锁失败,那么需要判断当前节点是否需要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 获取如果发生异常,则取消获取锁申请
if (failed)
cancelAcquire(node);
}
}
如果判断当前节点线程是否需要挂起,主要依靠shouldParkAfterFailedAcquire来判断,如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
通过上述源码,我们可以看到,只有当前节点的前继节点的状态是SINGAL(待唤醒状态)时,当前节点线程可以被挂起(这里主要是为了防止无限循环资源浪费)。上述代码还处理前置节点被取消了特殊情况,将当前节点的前置节点修改为队列中最后一个waitSatus不是取消状态的节点。
将当前节点的线程挂起方法如下,实现采用的LockSupport。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
取消获取所申请的方法是cancelAcquire,如下:
private void cancelAcquire(Node node) {
// 过滤无效节点
if (node == null)
return;
node.thread = null;
// 跳过所有处于取消的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 找到最后一个有效节点的后继节点
Node predNext = pred.next;
// 将当前节点的状态设置为取消
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,设置尾节点为最后一个有效节点
if (node == tail && compareAndSetTail(node, pred)) {
// 将最后一个有效节点的next设置为null
compareAndSetNext(pred, predNext, null);
} else {
// 如果最后一个有效节点不是头结点并且(最后一个有效节点是SINGAL状态或者可以被设置为SGINAL状态),并且最后一个有效节点的线程不为null
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 如果当前节点后的后置节点是有效节点(存在且状态不为取消状态),设置最后一个有效节点的next为当前节点的后继节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 上述条件如果均不成立,唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
6. 排他锁释放
public void unlock() {
sync.release(1);
}
ReentrantLock通过unlock方法来解锁,该方法会调用AQS中的release方法,如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease是由具体的AQS的子类来实现,下面看一下ReentrantLock中的Sync中的实现,如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 只有持有锁的线程才可以执行unlock操作
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 解锁成功,设置独占线程为空
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
下面再回到AQS的release方法,如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
如果解锁成功,并且当前节点状态不是初始状态,然后就调用unparkSuccessor方法唤醒head后继节点的线程,方法如下:
private void unparkSuccessor(Node node) {
// 获取当前节点状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取节点的后继节点
Node s = node.next;
// 如果节点的后继节点为null或者已经被取消,则从尾节点找到一个未取消的节点进行唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
解锁的过程还是相对比较简单,当一个线程在释放ReentrantLock的时候,需要从等待队列中唤醒其他需要唤醒的节点。
本期的Java AQS介绍到这,我是shysh95,关注+赞你就是最靓的仔,我们下期再见!