前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >LinkedBlockingQueue源码解析

LinkedBlockingQueue源码解析

作者头像
码农飞哥
发布2021-08-18 10:38:30
3540
发布2021-08-18 10:38:30
举报
文章被收录于专栏:好好学习

前言

前面我们分析了ArrayBlockingQueue,今天我们接着来对LinkedBlockingQueue的源码进行解析。本文首先会对LinkedBlockingQueue的源码进行解析,接着会介绍ArrayBlockingQueue和LinkedBlockingQueue的区别。

LinkedBlockingQueue的定义

LinkedBlockingQueue是一个基于链表(单链表)实现的先进先出的阻塞队列。队列中存在最久的节点是head节点。head节点永远存在,且是一个dummy节点(空节点)。LinkedBlockingQueue默认是无界的,使用上可能会有内存溢出的风险。

LinkedBlockingQueue的全局变量

代码语言:javascript
复制
        //链表的节点
        static class Node<E> {
        //存放的数据
        E item;
        //后继指针,指向下一个节点
        Node<E> next;

        Node(E x) { item = x; }
    }
    //队列的容量,默认容量是Integer.MAX_VALUE
     private final int capacity;
    //队列里元素的个数
    private final AtomicInteger count = new AtomicInteger();
    //队列的头节点,head.item==null
    transient Node<E> head;
    //队列的尾节点,last.next==null
     private transient Node<E> last;
     //出队take、poll使用的锁
     private final ReentrantLock takeLock = new ReentrantLock();
    //非空条件变量,队满时wait
     private final Condition notEmpty = takeLock.newCondition();
    //入队put、offer时使用的锁
    private final ReentrantLock putLock = new ReentrantLock();
    //非满条件变量,队空时wait
    private final Condition notFull = putLock.newCondition();

如上全局变量,跟ArrayBlockingQueue的区别是ArrayBlockingQueue内部是定义了一个数组,然后出队和入队用的是同一把锁。而LinkedBlockingQueue内部定义了一个链表的结构,入队用的是takeLock锁,出队用的是putLock锁。

LinkedBlockingQueue的构造器

代码语言:javascript
复制
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

如上构造器LinkedBlockingQueue队列默认是无界的,所以,在使用LinkedBlockingQueue一定要传入容量,不然会有内存溢出的风险。

入队方法

入队的put方法,主要的流程是判断队列是否满了,满了则线程等待,未满的话则插入元素,并且count加1。最后唤醒消费者线程进行消费。

put方法
代码语言:javascript
复制
 public void put(E e) throws InterruptedException {
         //如果元素e为空直接抛出NullPointerException
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        //定义一个节点存元素e
        Node<E> node = new Node<E>(e);
        //获取putLock
        final ReentrantLock putLock = this.putLock;
        //获取cout值
        final AtomicInteger count = this.count;
        //响应中断的加锁
        putLock.lockInterruptibly();
        try {
            //如果当前队列中的元素个数等于传入的容量capacity,则当前线程进入非满等待队列
            while (count.get() == capacity) {
                notFull.await();
            }
            //入队操作
            enqueue(node);
            //count加1并返回原值
            c = count.getAndIncrement();
            //如果count+1之后的值小于capacity,则唤醒生产者进行生产
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //解锁
            putLock.unlock();
        }
        // c == 0 说明 原来queue是空的, 所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
        if (c == 0)
            signalNotEmpty();
    }
enqueue方法
代码语言:javascript
复制
//节点入队方法,因为这里有个dummy节点,不需要判空
 private void enqueue(Node<E> node) {
        //尾插法,将新增的节点添加到元素的尾部
        // assert last.next == null;
        last = last.next = node;
    }
signalNotEmpty方法
代码语言:javascript
复制
  private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

方法signalNotEmpty()这时在原来queue的数量c==0时对此时在调用take/poll方法的线程进行唤醒。

出队方法

说完了入队方法,接下来,我们来看看出队方法。首先,我们从take方法开始说起。队列的出队都是从头节点开始出队第一个节点。

代码语言:javascript
复制
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //加锁
        takeLock.lockInterruptibly();
        try {
            //如果队列里的元素为0,则消费者线程进入noEmpty等待队列。
            while (count.get() == 0) {
                notEmpty.await();
            }
            //调用dequeue()方法进行出队
            x = dequeue();
            //元素c-1,并返回原值
            c = count.getAndDecrement();
            //如果c>1,说明队列还有元素,则唤醒消费者线程
            if (c > 1)
                notEmpty.signal();
        } finally {
        //解锁
            takeLock.unlock();
        }
        // c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private void signalNotFull(){
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    }finally {
        putLock.unlock();
    }
}
dequeue方法
代码语言:javascript
复制
private E dequeue(){
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;       // 这里的 head 是一个 dummy 节点
    Node<E> first = h.next; // 获取真正的节点
    h.next = h;             // help GC
    head = first;           // 重新赋值 head
    E x = first.item;       // 获取 dequeue 的值
    first.item = null;      // 将 item 置 空
    return x;
}

操作过程:将head.next值取出,然后将head.next赋值为新的head。需要注意的两点是:

  1. 出队完成之后如果c&gt;1则说明队列里还有元素,则调用notEmpty.signal()唤醒消费者线程
  2. 如果c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程。
获取队列头元素poll方法

poll 与 take 都是获取头节点的元素, 唯一的区别是 take在queue为空时进行await, poll 则直接返回

代码语言:javascript
复制
public E poll(long timeout, TimeUnit unit) throws InterruptedException{
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);             //  计算超时时间
    final AtomicInteger count = this.count;       // 获取 queue 的容量
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();                   // 获取 lock
    try{
        while(count.get() == 0){                   // queue 为空, 进行 await
            if(nanos <= 0){                        // timeout 用光了, 直接 return null
                return null;
            }
            nanos = notEmpty.awaitNanos(nanos);   // 调用 condition 进行 await, 在 timeout之内进行 signal -> nanos> 0
        }
        x = dequeue();                             // 节点出queue
        c = count.getAndDecrement();               // 计算器减一
        if(c > 1){                                 // c > 1 说明 poll 后 容器内还有元素, 进行 换新 await 的线程
            notEmpty.signal();
        }
    }finally {
        takeLock.unlock();                         // 释放锁
    }
    if(c == capacity){                           // c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
        signalNotFull();
    }
    return x;
}

ArrayBlockingQueue和LinkedBlockingQueue的区别

  1. ArrayBlockingQueue是有界的,而LinkedBlockingQueue默认是无界的。 在使用LinkedBlockingQueue无界情况下时要考虑内存实际使用问题,防止内存溢出问题的发生。 2.锁使用的比较,ArrayBlockingQueue内部使用1个锁来控制队列项的插入、取出操作,而LinkedBlockingQueue则是使用了2个锁来控制,一个名为putLock,另外一个是takeLock,但是锁的本质都是ReentrantLock。 3.吞吐性能强是因为有两个锁,试想一下,Array里面使用的时一个锁,不管put还是take行为,都可能被这个锁卡住,而Linked里面put和take是两个锁,put只会被put行为卡住,而不会被take卡住,因此吞吐性能自然强于Array。而“less predictable performance”这个也是显而易见的,Array采用的时固定内存,而Linked采用的时动态内存,无论是分配内存还是释放内存(甚至GC)动态内存的性能自然都会比固定内存要差。

总结

LinkedBlockingQueue是一个基于链表实现的阻塞queue,它的性能好于ArrayBlockingQueue,但是差于ConcurrentLinkedQueue;并且它非常适于生产者消费者的环境中,比Executors.newFixedThreadPool()就是基于这个队列的,使用LinkedBlockingQueue时一定要设置容量,不然会有内存溢出的风险。

参考

LinkedBlockingQueue 源码分析 (基于Java 8) LinkedBlockingQueue源码分析(JDK8)

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农飞哥 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • LinkedBlockingQueue的定义
    • LinkedBlockingQueue的全局变量
      • LinkedBlockingQueue的构造器
        • 入队方法
          • put方法
          • enqueue方法
          • signalNotEmpty方法
        • 出队方法
          • dequeue方法
          • 获取队列头元素poll方法
      • ArrayBlockingQueue和LinkedBlockingQueue的区别
      • 总结
      • 参考
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档