专栏首页余林丰11.并发包阻塞队列之LinkedBlockingQueue

11.并发包阻塞队列之LinkedBlockingQueue

jdk1.7.0_79

  在上文《10.并发包阻塞队列之ArrayBlockingQueue》中简要解析了ArrayBlockingQueue部分源码,在本文中同样要介绍的是Java并发包中的阻塞队列LinkedBlockingQueue。ArrayBlockingQueue队列是由数组实现,而LinkedBlockingQueue队列的实现则是链表(单向链表)实现,所以在LinkedBlockingQueue有一个Node内部类来表示链表的节点。

static final class Node<E> { 
  E item;//入队元素 
  Node<E> next;//指向后继节点 
  Node(E x) { 
    item = x; 
  } 
} 

  同样它也有3个构造方法,与ArrayBlockingQueue略有不同。

 1 public LinkedBlockingQueue() { 
 2   this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列 
 3 } 
 4 public LinkedBlockingQueue(int capacity) { 
 5   if (capacity <= o) throw new IllegalArgumentException(); 
 6   this.capacity = capacity; 
 7   last = head = new Node<E>(null);//头指针和尾指针指向头节点(null) 
 8 } 
 9 public LinkedBlockingQueue(Collection<? extends E> c ) { 
10   this(Integer.MAX_VALUE); 
11   final ReentrantLock putLock = this.putLock; 
12   putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作,同样也是为了保证其可见性。 
13   try { 
14       int n = 0; 
15       for (E e : c) { 
16           if (e == null) 
17               throw new NullPointerException(); 
18           if (n == capacity) 
19               throw new IllegalStateException("Queue full"); 
20           enqueue(new Node<E>(e));//入队 
21           ++n; 
22       } 
23       count.set(n); 
24   } finally { 
25       putLock.unlock(); 
26   } 
27 } 

  在第12行中获取锁是为了保证可见性,这个的原因我认为是,线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1和T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。

  在了解完LinkedBlockingQueue的构造方法后,我们回过头来看LinkedBlockingQueue的两个成员变量:

private final ReentrantLock takeLock = new ReentrantLock(); 
private final ReentrantLock putLock = new ReentrantLock(); 

  可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。

private final Condition notEmpty = takeLock.newCondition(); 
private final Condition notFull = putLock.newCondition(); 

  这两个成员变量则是线程等待队列,一个是出队锁上的等待队列,一个是入队锁上的等待队列。在ArrayBlockingQueue也有两个等待队列,一个是非空等待队列,另一个则是非满等待队列,在这一点上两者一致。

队列元素的插入

抛出异常

返回值(非阻塞)

一定时间内返回值

返回值(阻塞)

插入

add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue

offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。

offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。

put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。

  LinkedBlockingQueue中并没有像ArrayBlockingQueue那样重写了AbstractQueue的add方法而直接调用父类的add方法,所以LinkedBlockingQueue#add方法与ArrayBlockingQueue#add一样,都是直接调用其AbstractQueue。

//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 
public boolean add(E e) { 
  if (offer(e))//offer方法由Queue接口定义 
    return true; 
  else 
    throw new IllegalStateException(); 
} 
 1 //LinkedBlockingQueue#offer 
 2 public boolean offer(E e) { 
 3   if (e == null) throw new NullPointerException(); 
 4   final AtomicInteger count = this.count;//原子型int变量,线程安全,指向队列数据量引用 
 5   if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false 
 6     return false; 
 7   int c = -1; 
 8   Node<E> node = new Node(e); 
 9   final ReentrantLock putLock = this.putLock;//插入锁 
10   putLock.lock();//获得插入锁 
11   try { 
12     if (count.get() < capacity) { 
13       enqueuer(node);//入队 
14       c = count.getAndIncrement();//队列数据总数自增+1后返回 
15       if (c + 1 < capacity) 
16         notFull.signal();//唤醒非满等待队列上的线程 
17     } 
18   } finally { 
19     putLock.unlock(); 
20   } 
21   if (c == 0) 
22     signalNotEmpty();//队列中刚好有一个数据,唤醒非空等待队列 
23   return c >= 0 
24 } 

  在第10行是获取插入锁,和ArrayBlockingQueue只有一个锁不同的是,LinkedBlockingQueue分为入队锁和出队锁,也就是说对于ArrayBlockingQueue同时只能有一个线程对它进行入队或者出队操作,而对于LinkedBlockingQueue来说同时能有两个线程对队列进行入队或者出队操作。

  前两个add和offer方法都是非阻塞的,对于put方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。

//LinkedBlockingQueue#put 
public void put(E e) throws InterruptedException { 
  if (e == null) throws new NullPointerException(); 
  int c = -1; 
  Node<E> node = new Node(e); 
  final ReentrantLock putLock = this.putLock; 
  final AtomicInteger count = this.count; 
  putLock.lockInterrupted();//能被线程中断地获取锁 
  try { 
    while (count.get() == capacity) {//队列数据量等于队列容量 
      notFull.await();//休眠非满等待队列上的线程 
    } 
    enqueuer(node);//入队 
    c = count.getAndIncrement();//队列数据总数自增+1后返回 
    if (c + 1 < capacity)//还没有达到队列容量 
      notFull.signal();//唤醒非满等待队列上的线程 
  } finally { 
    putLock.unlock(); 
  } 
  if (c == 0) 
  signalNotEmpty();//唤醒非空等待队列上的线程 
} 

  队列插入的最后一个方法来看上面出现的enqueue入队方法。

private void enqueuer(Node<E> node) { 
  last = last.next = node;//将LinkedBlockingQueue中指向队尾的last.next指向新加入的node节点 
} 

队列元素的删除

抛出异常

返回值(非阻塞)

一定时间内返回值

返回值(阻塞)

remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue

poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。

poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值

take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。

//AbstractQueue#remove,同样这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 
public E remove() {  
  E x = poll();//poll方法由Queue接口定义  
  if (x != null)  
    return x;  
  else  
    throw new NoSuchElementException();  
} 
//LinkedBlockingQueue#poll 
public E poll() { 
  final AtomicInteger count = this.count; 
  if (count.get() == 0)  
    return null; 
  E x = null; 
  int c = -1; 
  final ReentrantLock takeLock = this.takeLock; 
  takeLock.lock();//获取出队锁 
  try { 
    if (count.get() > 0) {//队列不为空 
      x = dequeuer();//出队       c = count.getAndDecrement();//队列数据自减-1返回 
      if ( c > 1)  
        notEmpty.signal();//唤醒非空等待队列上的线程 
    } 
  } finally { 
    takeLock.unlock(); 
  } 
  if (c == capacity) 
    signalNotFull();//唤醒非满等待队列上的线程 
  return x; 
} 

  前两个remove和poll方法都是非阻塞的,对于take方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。

public E take() throws InterruptedException { 
  E x; 
  int c = -1; 
  final AtomicInteger count = this.count; 
  final ReentrantLock takeLock = this.takeLock; 
  take.lockInterruptibly();//可被线程中断返回地获取锁 
  try { 
    while (count.get() == 0) {//队列数据为空 
      notEmpty.await();//休眠非空等待队列上的线程 
    } 
    x = dequeuer();//此时非空等待队列上的线程被唤醒,队列数据不为空,出队 
    c = count.getAndDecrement(); 
  if (c > 1) 
    notEmpty.signal();//唤醒非空等待队列上的线程 
  } finally { 
    takeLock.unlock(); 
  } 
  if (c == capacity) 
    signalNotFull();//唤醒非满等待队列 
  return x; 
} 

  队列出队的最后一个方法来看上面出现的dequeue入队方法。

private E dequeue() { 
  Node<E> h = head;//头节点,为空 
  Node<E> first = h.next; 
  h.next = h;//此时没有节点指向头节点,便于GC 
  head = first; 
  E x = first.item; 
  first.item = null; 
  return x; 
} 

  最后一个方法size。

public int size() { 
  return count.get();//和ArrayBlockingQueue类似,与ConcurrentLinkedQueue不同,没有遍历整个队列,而是直接返回count变量。此处的count是AtomicInteger变量。 
} 

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 6.类似Object监视器方法的Condition接口

      在《1.有关线程、并发的基本概念》中,我们利用synchronized关键字、Queue队列、以及Object监视器方法实现了生产者消费者,介绍了有关线程的...

    用户1148394
  • 12.ThreadPoolExecutor线程池原理及其execute方法

    jdk1.7.0_79   对于线程池大部分人可能会用,也知道为什么用。无非就是任务需要异步执行,再者就是线程需要统一管理起来。对于从线程池中获取线程,大部分...

    用户1148394
  • 1.有关线程、并发的基本概念

    什么是线程?   提到“线程”总免不了要和“进程”做比较,而我认为在Java并发编程中混淆的不是“线程”和“进程”的区别,而是“任务(Task)”。进程是表示资...

    用户1148394
  • Java并发编程,Condition的await和signal等待通知机制

    Object类是Java中所有类的父类, 在线程间实现通信的往往会应用到Object的几个方法: wait(),wait(long timeout),wait(...

    李红
  • Java中常用的七个阻塞队列第二篇DelayQueue源码介绍

    通过前面两篇文章,我们对队列有了了解及已经认识了常用阻塞队列中的三个了。本篇我们继续介绍剩下的几个队列。

    凯哥Java
  • [leetcode栈队列]1 栈实现队列

    因为小蓝当时很想去做短视频的互联网公司,无奈学校当地鲜有相关公司来校招,所以就跋涉去了湖大,当天晚上参加了现场笔试,半夜收到第二天复试通知,二面中面试官就让手撕...

    我是程序员小贱
  • 走出锁的误区 - 正面认识锁

    多线程编程,锁通常是必不可少的保证代码运行安全的工具,一提到锁,最直接想到的是性能问题,给人的印象是锁会影响系统性能。这固然不然。但性能本身并不是锁本身引起的,...

    一见
  • synchronized锁住了什么?

    多线程环境中,对于共享变量的访问一定需要进行正确的管理才能保证代码的正确执行,也就是保证线程安全。而加synchronized关键关键字无疑是个简单的办法,sy...

    naget
  • 第四十三章: 基于SpringBoot & RabbitMQ完成TopicExchange分布式消息消费

    恒宇少年
  • 用vue一个计算属性,实现一个常见表单交互效果

    vue.js是现在用的非常火热的一个前端框架,表单又是网站基本不会缺少的一环。用vue操作表单。表单的操作方式也是多种多样。今天我说的,就是我项目那里做的这一种...

    守候i

扫码关注云+社区

领取腾讯云代金券