前面我们分析了ArrayBlockingQueue,今天我们接着来对LinkedBlockingQueue的源码进行解析。本文首先会对LinkedBlockingQueue的源码进行解析,接着会介绍ArrayBlockingQueue和LinkedBlockingQueue的区别。
LinkedBlockingQueue是一个基于链表(单链表)实现的先进先出的阻塞队列。队列中存在最久的节点是head节点。head节点永远存在,且是一个dummy节点(空节点)。LinkedBlockingQueue默认是无界的,使用上可能会有内存溢出的风险。
//链表的节点
static class Node<E> {
//存放的数据
E item;
//后继指针,指向下一个节点
Node<E> next;
Node(E x) { item = x; }
}
//队列的容量,默认容量是Integer.MAX_VALUE
private final int capacity;
//队列里元素的个数
private final AtomicInteger count = new AtomicInteger();
//队列的头节点,head.item==null
transient Node<E> head;
//队列的尾节点,last.next==null
private transient Node<E> last;
//出队take、poll使用的锁
private final ReentrantLock takeLock = new ReentrantLock();
//非空条件变量,队满时wait
private final Condition notEmpty = takeLock.newCondition();
//入队put、offer时使用的锁
private final ReentrantLock putLock = new ReentrantLock();
//非满条件变量,队空时wait
private final Condition notFull = putLock.newCondition();
如上全局变量,跟ArrayBlockingQueue的区别是ArrayBlockingQueue内部是定义了一个数组,然后出队和入队用的是同一把锁。而LinkedBlockingQueue内部定义了一个链表的结构,入队用的是takeLock锁,出队用的是putLock锁。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
如上构造器LinkedBlockingQueue队列默认是无界的,所以,在使用LinkedBlockingQueue一定要传入容量,不然会有内存溢出的风险。
入队的put方法,主要的流程是判断队列是否满了,满了则线程等待,未满的话则插入元素,并且count加1。最后唤醒消费者线程进行消费。
public void put(E e) throws InterruptedException {
//如果元素e为空直接抛出NullPointerException
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
//定义一个节点存元素e
Node<E> node = new Node<E>(e);
//获取putLock
final ReentrantLock putLock = this.putLock;
//获取cout值
final AtomicInteger count = this.count;
//响应中断的加锁
putLock.lockInterruptibly();
try {
//如果当前队列中的元素个数等于传入的容量capacity,则当前线程进入非满等待队列
while (count.get() == capacity) {
notFull.await();
}
//入队操作
enqueue(node);
//count加1并返回原值
c = count.getAndIncrement();
//如果count+1之后的值小于capacity,则唤醒生产者进行生产
if (c + 1 < capacity)
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
// c == 0 说明 原来queue是空的, 所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
if (c == 0)
signalNotEmpty();
}
//节点入队方法,因为这里有个dummy节点,不需要判空
private void enqueue(Node<E> node) {
//尾插法,将新增的节点添加到元素的尾部
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
方法signalNotEmpty()
这时在原来queue的数量c==0时对此时在调用take/poll方法的线程进行唤醒。
说完了入队方法,接下来,我们来看看出队方法。首先,我们从take方法开始说起。队列的出队都是从头节点开始出队第一个节点。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//加锁
takeLock.lockInterruptibly();
try {
//如果队列里的元素为0,则消费者线程进入noEmpty等待队列。
while (count.get() == 0) {
notEmpty.await();
}
//调用dequeue()方法进行出队
x = dequeue();
//元素c-1,并返回原值
c = count.getAndDecrement();
//如果c>1,说明队列还有元素,则唤醒消费者线程
if (c > 1)
notEmpty.signal();
} finally {
//解锁
takeLock.unlock();
}
// c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull(){
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
}finally {
putLock.unlock();
}
}
private E dequeue(){
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head; // 这里的 head 是一个 dummy 节点
Node<E> first = h.next; // 获取真正的节点
h.next = h; // help GC
head = first; // 重新赋值 head
E x = first.item; // 获取 dequeue 的值
first.item = null; // 将 item 置 空
return x;
}
操作过程:将head.next值取出,然后将head.next赋值为新的head。需要注意的两点是:
c>1
则说明队列里还有元素,则调用notEmpty.signal()
唤醒消费者线程poll 与 take 都是获取头节点的元素, 唯一的区别是 take在queue为空时进行await, poll 则直接返回
public E poll(long timeout, TimeUnit unit) throws InterruptedException{
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout); // 计算超时时间
final AtomicInteger count = this.count; // 获取 queue 的容量
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 获取 lock
try{
while(count.get() == 0){ // queue 为空, 进行 await
if(nanos <= 0){ // timeout 用光了, 直接 return null
return null;
}
nanos = notEmpty.awaitNanos(nanos); // 调用 condition 进行 await, 在 timeout之内进行 signal -> nanos> 0
}
x = dequeue(); // 节点出queue
c = count.getAndDecrement(); // 计算器减一
if(c > 1){ // c > 1 说明 poll 后 容器内还有元素, 进行 换新 await 的线程
notEmpty.signal();
}
}finally {
takeLock.unlock(); // 释放锁
}
if(c == capacity){ // c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
signalNotFull();
}
return x;
}
LinkedBlockingQueue是一个基于链表实现的阻塞queue,它的性能好于ArrayBlockingQueue,但是差于ConcurrentLinkedQueue;并且它非常适于生产者消费者的环境中,比Executors.newFixedThreadPool()就是基于这个队列的,使用LinkedBlockingQueue时一定要设置容量,不然会有内存溢出的风险。
LinkedBlockingQueue 源码分析 (基于Java 8) LinkedBlockingQueue源码分析(JDK8)