专栏首页后端技术学习LinkedBlockingQueue源码学习

LinkedBlockingQueue源码学习

首先来看一个例子,例子来源于网上:

/**
 * 多线程模拟实现生产者/消费者模型
 *  
 */
public class BlockingQueueTest2 {
    /**
     * 
     * 定义装苹果的篮子
     * 
     */
    public class Basket {
        // 篮子,能够容纳3个苹果
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);

        // 生产苹果,放入篮子
        public void produce() throws InterruptedException {
            // put方法放入一个苹果,若basket满了,等到basket有位置
            basket.put("An apple");
        }

        // 消费苹果,从篮子中取走
        public String consume() throws InterruptedException {
            // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
            return basket.take();
        }
    }

    // 定义苹果生产者
    class Producer implements Runnable {
        private String instance;
        private Basket basket;

        public Producer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 生产苹果
                    System.out.println("生产者准备生产苹果:" + instance);
                    basket.produce();
                    System.out.println("!生产者生产苹果完毕:" + instance);
                    // 休眠300ms
                    Thread.sleep(300);
                }
            } catch (InterruptedException ex) {
                System.out.println("Producer Interrupted");
            }
        }
    }

    // 定义苹果消费者
    class Consumer implements Runnable {
        private String instance;
        private Basket basket;

        public Consumer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 消费苹果
                    System.out.println("消费者准备消费苹果:" + instance);
                    System.out.println(basket.consume());
                    System.out.println("!消费者消费苹果完毕:" + instance);
                    // 休眠1000ms
                    Thread.sleep(1000);
                }
            } catch (InterruptedException ex) {
                System.out.println("Consumer Interrupted");
            }
        }
    }

    public static void main(String[] args) {
        BlockingQueueTest2 test = new BlockingQueueTest2();

        // 建立一个装苹果的篮子
        Basket basket = test.new Basket();

        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = test.new Producer("生产者001", basket);
        Producer producer2 = test.new Producer("生产者002", basket);
        Consumer consumer = test.new Consumer("消费者001", basket);
        service.submit(producer);
        service.submit(producer2);
        service.submit(consumer);
        // 程序运行5s后,所有任务停止
//        try {
//            Thread.sleep(1000 * 5);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        service.shutdownNow();
    }

}

采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:

采用链表数据结构Node的方式进行节点数据的记录,

同时其进行入队和出队的计数器采用原子性的AtomicInteger

其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock

其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。

1.相关变量

//容量,为空时使用Integer.MAX_VALUE=2^31-1
private final int capacity;

/** Current number of elements */
//计数,队列中的元素个数
private final AtomicInteger count = new AtomicInteger();

//头结点,head.item==null,首节点不存放元素
transient Node<E> head;

//尾节点,last.next==null
private transient Node<E> last;

/** Lock held by take, poll, etc */
//消费队列锁
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
//消费队列等待消费,用于队满时,进行消费
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
//生产队列锁
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
//生产队列等待生产,用于队空时,进行生产
private final Condition notFull = putLock.newCondition();

 //节点信息:数据、后继点击
static class Node<E> {
    E item;

    /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
    //下一个节点,分为三种情况:
    // 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点
    Node<E> next;

    Node(E x) { item = x; }
}

2.构造方法

//构造方法,空参构造默认队列容量为2^31-1
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//构造方法,带指定容量
public LinkedBlockingQueue(int capacity) {
    //对容量进行校验
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //创建节点信息
    last = head = new Node<E>(null);
}

//构造方法,放入带指定集合的元素信息入队
//首先采用默认大小,进行上锁操作,
// 放入元素到队列中,进行遍历,放入
public LinkedBlockingQueue(Collection<? extends E> c) {
    //默认队列大小,2^31-1
    this(Integer.MAX_VALUE);
    //进行上锁操作
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        //放入元素,进行计数
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        //释放锁
        putLock.unlock();
    }
}

3.方法

生产方法

put操作

//入队操作
//首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待,
// 如果队列没有满,则进行生产操作,同时计数器进行计数
//生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程
//当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程
public void put(E e) throws InterruptedException {
    //非空校验
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    //设置计数为0,失败的时候返回
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //中断上锁
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        //检查队列是否满了,满了进行阻塞操作
        while (count.get() == capacity) {
            notFull.await();
        }
        //入队操作,将节点信息插入到队尾
        //last=last.next=node
        enqueue(node);
        c = count.getAndIncrement();
        //元素没有满,则唤醒被阻塞的线程,增加线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    //插入的是一个元素时唤醒阻塞等待的线程
    if (c == 0)
        signalNotEmpty();
}

offer操作

//阻塞带超时时间的offer操作
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            //如果时间<0,则表示超时返回了,此时队列未满,直接返回
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        //否者进行入队操作
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

//首先进行非空校验,如果队满了,直接返回false
//如果没有满,则进行上锁,同时进行判断,
// 如果计数<容量,则进行入队操作
//最后释放锁
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

消费者操作

take操作

//take操作 消费消息
//如果队列为非空或者被唤醒,进行消费操作,计数器-1
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //开始消费
    if (c == capacity)
        signalNotFull();
    return x;
}

pull操作

//进行消费操作 poll,带超时时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}


 //进行poll操作
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

remove操作

//删除操作,释放指定节点信息
public boolean remove(Object o) {
    if (o == null) return false;
    //对生产消息和消费消息进行上锁
    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                //释放节点
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

drainTo操作

public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }
    
 //一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销
    //其中c和maxElement表示返回的集合、要获取的元素个数
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;
        //进行上锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //拿到两者之间的最小的一个
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                //将元素添加中集合c中
                while (i < n) {
                    Node<E> p = h.next;
                    c.add(p.item);
                    p.item = null;
                    h.next = h;
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    // assert h.item == null;
                    head = h;
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)
                signalNotFull();
        }
    }
    

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RocketMQ学习5

    进行消息发送的过程首先会准备好路由信息,最终是由netty完成的,也即使用nettyRemotingClient来实现的。

    路行的亚洲
  • ConcurrentHashMap源码学习

    既然有了HashMap为什么还会出现ConcurrentHashMap?同时ConcurrentHashMap具有什么优势?ConcurrentHashMap与...

    路行的亚洲
  • Netty学习三

    前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllT...

    路行的亚洲
  • 用户管理模块之用户注册

    爱撒谎的男孩
  • Android 仿UC浏览器详情页评论弹框效果

    额,突然发现UC被我卸载了,这个是QQ浏览器的效果,不过都一样,如果当前页面不是全屏的话,把根布局设为相对布局,然后设置评论布局为处于底部,这样在点击评论时弹开...

    萬物並作吾以觀復
  • TextView AutoLink, ClikSpan 与长按事件冲突的解决

    首先,我们先来复习一下 autoLink 和 ClickableSpan 是干什么用的。

    用户2965908
  • Netty 之 FileRegion 文件传输

    Netty 传输文件的时候没有使用 ByteBuf 进行向 Channel 中写入数据,而使用的 FileRegion。下面通过示例了解下 FileRegion...

    java404
  • 【python系统学习04】条件判断语句

    学过 js 的你,看到这个肯定小 case 吧!肯定第一时间得到答案,打印出“1”吧!

    xing.org1^
  • 还在用if else?策略模式了解一下!

    小编在公司负责的就是订单取消业务,老系统中各种类型订单取消都是通过if else 判断不同的订单类型进行不同的逻辑。在经历老系统的折磨和产品需求的不断变更,...

    cxuan
  • 聊聊sentinel的SentinelWebAutoConfiguration

    本文主要研究一下sentinel的SentinelWebAutoConfiguration

    codecraft

扫码关注云+社区

领取腾讯云代金券