前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >LinkedBlockingQueue源码学习

LinkedBlockingQueue源码学习

作者头像
路行的亚洲
发布2020-07-21 15:47:29
3400
发布2020-07-21 15:47:29
举报
文章被收录于专栏:后端技术学习后端技术学习

首先来看一个例子,例子来源于网上:

/**
 * 多线程模拟实现生产者/消费者模型
 *  
 */
public class BlockingQueueTest2 {
    /**
     * 
     * 定义装苹果的篮子
     * 
     */
    public class Basket {
        // 篮子,能够容纳3个苹果
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);

        // 生产苹果,放入篮子
        public void produce() throws InterruptedException {
            // put方法放入一个苹果,若basket满了,等到basket有位置
            basket.put("An apple");
        }

        // 消费苹果,从篮子中取走
        public String consume() throws InterruptedException {
            // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
            return basket.take();
        }
    }

    // 定义苹果生产者
    class Producer implements Runnable {
        private String instance;
        private Basket basket;

        public Producer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 生产苹果
                    System.out.println("生产者准备生产苹果:" + instance);
                    basket.produce();
                    System.out.println("!生产者生产苹果完毕:" + instance);
                    // 休眠300ms
                    Thread.sleep(300);
                }
            } catch (InterruptedException ex) {
                System.out.println("Producer Interrupted");
            }
        }
    }

    // 定义苹果消费者
    class Consumer implements Runnable {
        private String instance;
        private Basket basket;

        public Consumer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 消费苹果
                    System.out.println("消费者准备消费苹果:" + instance);
                    System.out.println(basket.consume());
                    System.out.println("!消费者消费苹果完毕:" + instance);
                    // 休眠1000ms
                    Thread.sleep(1000);
                }
            } catch (InterruptedException ex) {
                System.out.println("Consumer Interrupted");
            }
        }
    }

    public static void main(String[] args) {
        BlockingQueueTest2 test = new BlockingQueueTest2();

        // 建立一个装苹果的篮子
        Basket basket = test.new Basket();

        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = test.new Producer("生产者001", basket);
        Producer producer2 = test.new Producer("生产者002", basket);
        Consumer consumer = test.new Consumer("消费者001", basket);
        service.submit(producer);
        service.submit(producer2);
        service.submit(consumer);
        // 程序运行5s后,所有任务停止
//        try {
//            Thread.sleep(1000 * 5);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        service.shutdownNow();
    }

}

采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:

采用链表数据结构Node的方式进行节点数据的记录,

同时其进行入队和出队的计数器采用原子性的AtomicInteger

其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock

其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。

1.相关变量

//容量,为空时使用Integer.MAX_VALUE=2^31-1
private final int capacity;

/** Current number of elements */
//计数,队列中的元素个数
private final AtomicInteger count = new AtomicInteger();

//头结点,head.item==null,首节点不存放元素
transient Node<E> head;

//尾节点,last.next==null
private transient Node<E> last;

/** Lock held by take, poll, etc */
//消费队列锁
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
//消费队列等待消费,用于队满时,进行消费
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
//生产队列锁
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
//生产队列等待生产,用于队空时,进行生产
private final Condition notFull = putLock.newCondition();

 //节点信息:数据、后继点击
static class Node<E> {
    E item;

    /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
    //下一个节点,分为三种情况:
    // 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点
    Node<E> next;

    Node(E x) { item = x; }
}

2.构造方法

//构造方法,空参构造默认队列容量为2^31-1
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//构造方法,带指定容量
public LinkedBlockingQueue(int capacity) {
    //对容量进行校验
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //创建节点信息
    last = head = new Node<E>(null);
}

//构造方法,放入带指定集合的元素信息入队
//首先采用默认大小,进行上锁操作,
// 放入元素到队列中,进行遍历,放入
public LinkedBlockingQueue(Collection<? extends E> c) {
    //默认队列大小,2^31-1
    this(Integer.MAX_VALUE);
    //进行上锁操作
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        //放入元素,进行计数
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        //释放锁
        putLock.unlock();
    }
}

3.方法

生产方法

put操作

//入队操作
//首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待,
// 如果队列没有满,则进行生产操作,同时计数器进行计数
//生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程
//当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程
public void put(E e) throws InterruptedException {
    //非空校验
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    //设置计数为0,失败的时候返回
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //中断上锁
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        //检查队列是否满了,满了进行阻塞操作
        while (count.get() == capacity) {
            notFull.await();
        }
        //入队操作,将节点信息插入到队尾
        //last=last.next=node
        enqueue(node);
        c = count.getAndIncrement();
        //元素没有满,则唤醒被阻塞的线程,增加线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    //插入的是一个元素时唤醒阻塞等待的线程
    if (c == 0)
        signalNotEmpty();
}

offer操作

//阻塞带超时时间的offer操作
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            //如果时间<0,则表示超时返回了,此时队列未满,直接返回
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        //否者进行入队操作
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

//首先进行非空校验,如果队满了,直接返回false
//如果没有满,则进行上锁,同时进行判断,
// 如果计数<容量,则进行入队操作
//最后释放锁
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

消费者操作

take操作

//take操作 消费消息
//如果队列为非空或者被唤醒,进行消费操作,计数器-1
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //开始消费
    if (c == capacity)
        signalNotFull();
    return x;
}

pull操作

//进行消费操作 poll,带超时时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}


 //进行poll操作
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

remove操作

//删除操作,释放指定节点信息
public boolean remove(Object o) {
    if (o == null) return false;
    //对生产消息和消费消息进行上锁
    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                //释放节点
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

drainTo操作

public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }
    
 //一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销
    //其中c和maxElement表示返回的集合、要获取的元素个数
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;
        //进行上锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //拿到两者之间的最小的一个
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                //将元素添加中集合c中
                while (i < n) {
                    Node<E> p = h.next;
                    c.add(p.item);
                    p.item = null;
                    h.next = h;
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    // assert h.item == null;
                    head = h;
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)
                signalNotFull();
        }
    }
    
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档