ReentrantLock
Lock lock = new ReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now());
sleep(TimeUnit.SECONDS, 2);
lock.unlock();
}).start();
}
---------------------------------执行结果------------------------------------
Thread-0 2020-01-01T17:12:24.678
Thread-1 2020-01-01T17:12:26.681
Thread-2 2020-01-01T17:12:28.681
独占锁
公平锁
在tryAcquire
方法中会先判断队列中有没有阻塞节点,有就加入队列,没有就通过CAS尝试获取锁非公平锁
在tryAcquire
中不管队列中有没有阻塞节点,直接先通过CAS尝试获取锁,获取成功就返回,获取失败就加入阻塞队列CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 80);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 50);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 60);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
new Thread(() -> {
sleep(TimeUnit.MILLISECONDS, 60);
System.out.println(Thread.currentThread().getName() + " Finished");
countDownLatch.countDown();
}).start();
countDownLatch.await();
System.out.println("All Finished");
---------------------------------执行结果------------------------------------
Thread-1 Finished
Thread-2 Finished
Thread-3 Finished
All Finished
Thread-0 Finished
共享锁
state
的值CountDownLatch#await
方法的时候,会判断state
的值是否等于0,不等于0就添加到阻塞队列,等于0就直接返回CountDownLatch#countDown
方法的时候,state
减1,判断state
的值是否等于0,等于0就释放因调用CountDownLatch#await
方法而阻塞的线程,不等于0就直接返回CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("阻塞解除之后执行一些逻辑 " + LocalDateTime.now()));
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " doXX");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " doXX");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
sleep(TimeUnit.SECONDS, 2);
}
---------------------------------执行结果------------------------------------
Thread-0 doXX
Thread-1 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:15.735
Thread-2 doXX
Thread-3 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:17.706
Thread-4 doXX
Thread-5 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:19.706
字面意思就是循环壁垒
,使用上与CountDownLatch
类似,不过实现上完全不一样,CyclicBarrier
统计的的是调用了CyclicBarrier#await
方法的线程数,当线程数达到了CyclicBarrier
初始时规定的数目时,所有进入等待状态的线程将被唤醒然后进入下一轮,可以重复使用
ReentrantLock
和Condition
CyclicBarrier#await
之前,需要先执行ReentrantLock#lock
方法,完成之后执行ReentrantLock#unlock
方法,即通过ReentrantLock
保证执行CyclicBarrier#await
方法是安全的CyclicBarrier#await
方法的时候,不满足释放条件(调用CyclicBarrier#await
方法的线程数不等于初始值)时,会调用Condition#await
方法是当前线程阻塞,满足释放条件时会调用Condition#signalAll
唤醒所有阻塞的线程然后进入下一轮CyclicBarrier
中有一个内部类Generation
,该内部类就表示一轮一轮循环的意思,当满足释放条件时,除了唤醒所有因调用CyclicBarrier#await
方法而阻塞的线程,还会生成一个新的Generation
对象,代表下一轮开始信号量
,可以用来控制同时访问资源的线程个数,比如可以用在对线程数的限流,在初始化的时候需要用户传入许可的数量,通过Semaphore#acquire
方法获取一个许可,如果Semaphore
还有许可可获取就直接返回,否则阻塞当前线程
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 4; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "访问资源");
sleep(TimeUnit.MILLISECONDS, 10 + new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放资源");
semaphore.release();
}
}).start();
}
---------------------------------执行结果------------------------------------
Thread-1访问资源
Thread-0访问资源
Thread-0释放资源
Thread-2访问资源
Thread-1释放资源
Thread-3访问资源
Thread-2释放资源
Thread-3释放资源
共享锁
Sync
实现了AbstractQueuedSynchronizer
,Sync
实现类有两个,对应公平锁和非公平锁Semaphore
其实就是一个共享锁
,根据我们对共享锁
的了解,共享锁
表示同一时刻最多有多少个线程持有锁,这和Semaphore
的特性是一致的,所以Semaphore
只需要再封装一层API,调用Semaphore#acquire
方法时候获取锁或阻塞,调用,调用
Semaphore#release`通过自旋释放锁就可以了ReentrantLock
和Condition
优先级
无界
阻塞队列Delayed
接口,Delayed#getDelay
方法返回值<=0
代表该元素延迟时间已到,可以出队了优先级
保证元素插入的顺序,基于Delayed#getDelay
来控制元素延迟的时间,基于Condition
控制在获取元素时是阻塞还是返回(队首元素)ReentrantLock
和2个Condition
,分别为ReentrantLock存取锁
和Condition存取锁
有界
阻塞队列,如果不指定,容量默认为Integer.MAX_VALUE
Node
节点,每个Node
节点有一个next指针尾进头出
,添加元素的时候放到尾部,获取元素的时候放到头部涉及到的方法:LinkedBlockingQueue#put
,LinkedBlockingQueue#offer
,如果队列满了,前者会阻塞,后者不会阻塞
ReentrantLock存锁
Condition存锁
阻塞当前线程,否则添加元素入队
,将当前元素添加到队尾,将上次队位元素的next指针指向该元素,更新last
指向容量==0
,唤醒Condition写锁
涉及到的方法:LinkedBlockingQueue#take
,LinkedBlockingQueue#poll
,如果队列空了,前者会阻塞,后者不会阻塞
ReentrantLock写锁
Condition写锁
阻塞当前线程,否则获取元素出队
,从队首去取出一个元素,更新head
指向容量满了
,唤醒Condition读锁
ReentrantLock
和2个Condition
,两个Condition
分别用于判断是否为空和是否已满有界
阻塞队列,基于final数组
,自然是有界的final数组
,,在初始化ArrayBlockingQueue
的时候需要指定容量涉及到的方法:ArrayBlockingQueue#put
,ArrayBlockingQueue#offer
,如果队列满了,前者会阻塞,后者不会阻塞
ReentrantLock
锁Condition
阻塞当前线程putIndex
用于记录下次添加元素时对应的数组下标,当takeIndex==队列.length
的时候,重置该变量为0Condition
涉及到的方法:ArrayBlockingQueue#take
,ArrayBlockingQueue#poll
,如果队列空了,前者会阻塞,后者不会阻塞
ReentrantLock
锁Condition
阻塞当前线程takeIndex
用于记录下次获取元素时对应的数组下标,当takeIndex==队列.length
的时候,重置该变量为0Condition
进行线程间数据交换的利器 todo
ReentrantLock
和Condition
优先级
无界
阻塞队列,优先级
的实现基于二叉堆
null
,且需要实现Comparable
接口优先级的 无界阻塞队列
,优先级
可以基于自然排序,也可以基于Comparable
接口,取决于你使用哪个构造函数
涉及到的方法: PriorityBlockingQueue#add
PriorityBlockingQueue#offer
ReentrantLock
获取锁size >= queue.length
,如果条件成立就扩容
3.通过Comparable#
和二叉堆
便利找到合适的位置插入元素size
加1PriorityBlockingQueue#take
方法而阻塞的线程(即队列中没有元素的时候),这个通过Condition
实现涉及到的方法: PriorityBlockingQueue#poll
PriorityBlockingQueue#take
ReentrantLock
获取锁PriorityBlockingQueue#poll
方法,在队列中没有元素的时候直接返回null
,不会阻塞当前线程PriorityBlockingQueue#take
方法,在队列中没有元素的时候会阻塞当前线程,知道队列中有元素然后再被唤醒返回,基于Condition
实现涉及到的方法: PriorityBlockingQueue#tryGrow
ReentrantLock#unlock
方法,释放ReentrantLock
锁,为什么要释放这个锁呢?我猜这里是为了提高性能,在扩容之前先释放锁,然后通过一个CAS变量来控制扩容的并发问题,这样在扩容期间就不会接阻塞其他调用线程,比如take
操作,很妙volatile
变量表示,0表示目前不在扩容,1表示正在扩容,每次扩容之前通过CAS将其设置为1,如果CAS失败说明目前有其他线程正在扩容,此时不做处理*2+2
;如果不小于64,则对容量*1.5
Object[]
Thread#yield
让出当前CPUSystem#arraycopy
对行队列赋值,在赋值之前需要先通过ReentrantLock#lock
再次获取锁可以将它看成是一个线程安全的ArrayList
,在涉及到修改操作时,通过ReentrantLock
获取锁,然后复制一个新的数组去修改,基于volatile
语义可以读数据时不会有问题,适用于读多写少的场景,如果写比较多的比较影响性能
ReentrantLock
和volatile
null
例如CopyOnWriteArrayList#get
方法,直接返回数组下标对应的元素即可
例如CopyOnWriteArrayList#set
CopyOnWriteArrayList#add`方法
ReentrantLock
锁内部持有一个CopyOnWriteArrayList
引用,也就是它的实现完全是基于CopyOnWriteArrayList
,那它是如何保证元素不唯一呢?在CopyOnWriteArrayList
中有一个addIfAbsent
方法,该方法会通过遍历的方式去判断你要添加的元素是否存在.
适合读多写少的场景
StringBuilder data = new StringBuilder("data");
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
new Thread((() -> {
// sleep一会,让写锁先持有锁
sleep(TimeUnit.MILLISECONDS, 10);
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "获得读锁 " + LocalDateTime.now());
System.out.println(Thread.currentThread().getName() + data);
readWriteLock.readLock().unlock();
}), "readLock ").start();
new Thread((() -> {
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "获得写锁 " + LocalDateTime.now());
sleep(TimeUnit.SECONDS, 3);
data.append("666");
// 锁降级 这里这样子使用降级感觉没有什么意思,那锁降级一般用到什么场景
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + data);
readWriteLock.writeLock().unlock();
}), "writeLock ").start();
public ReentrantReadWriteLock() {
this(false);
}
// 可以在创建ReentrantReadWriteLock时选择公平模式还是非公平模式
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReentrantReadWriteLock
时选择使用公平锁还是非公平锁Sync
继承自AbstractQueuedSynchronizer
,它负责实现同步器的模板方法,是实现同步器的关键ReadLock
和WriteLock
实现了Lock
接口,可以将它们看作是API层,具体逻辑委托给Sync
实现,面向用户static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 其实这里就涉及到锁降级,如果当前已经有写锁,返回-1,将它加入到阻塞队列, 否则继续尝试获取读锁
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// readerShouldBlock主要是针对公平锁和非公平锁, `c + SHARED_UNIT`是因为共享锁用的是高16位
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// firstReader是把读锁状态从0变成1的那个线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 锁重入
firstReaderHoldCount++;
} else {
// cachedHoldCounter是上一个获取锁成功的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 上面的if失败了,即通过自旋获取锁
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果 firstReader 是当前线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 并且firstReaderHoldCount == 1,说明释放锁之后需要重置firstReader
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
state==0
,需要唤醒阻塞的写锁protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// state != 0 && w==0 说明有读锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire 锁重入
setState(c + acquires);
return true;
}
// writerShouldBlock用于确定公平和非公平模式
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
判断是又读锁,判断是否锁重入,通过CAS设置状态
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
和正常的独占锁释放一样