前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程之阻塞队列

多线程之阻塞队列

作者头像
OPice
发布2019-10-23 22:43:28
8390
发布2019-10-23 22:43:28
举报
文章被收录于专栏:D·技术专栏D·技术专栏

阻塞队列

BlockingQueue

队列主要有两种:FIFO(先进先出)、LIFO(后进先出)。 再多线程环境中,队列很容实现数据共享,我们常用的"生产者"、"消费者"模型就可以通过队列来传递数据达到数据共享。但是现实中,大多数情况都是生产者产生消息的速度和消费的速度是不匹配的,就需要相应的对生产或者消费进行阻塞。当生产的消息积累到一定程度时,就需要对生产者就行阻塞,以便消费者将积累的消息进行消费。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全。BlockingQueue释放了我们的双手,他让我们不用关系什么时候去阻塞,什么时候去唤醒线程。

抛异常

返回false

阻塞

超时,抛异常

插入

add

offer

put

offer(timeout)

移除

take

remove

poll(timeout)

检查

contains

操作方法:

代码语言:javascript
复制
//--------添加 ----------
boolean add(E e);        //添加元素,加不了抛异常
boolean offer(E e);      //添加元素,加不了返回false
void put(E e) throws InterruptedException;  //添加元素,加不了一直阻塞
boolean offer(E e, long timeout, TimeUnit unit)
       throws InterruptedException; //添加元素,达到指定时间没有加入抛异常
// -------移除-----------
boolean remove(Object o);
E poll(long timeout, TimeUnit unit)
       throws InterruptedException;
E take() throws InterruptedException;
// ------

常见的BlockingQueue

有界性

数据结构

ArrayBlockingQueue

bounded

加锁

ArrayList

LinkedBlockingQueue

optionally-bounded

加锁

LinkedList

DelayQueue

unbounded

加锁

heap

PriorityBlockingQueue

unbounded

加锁

heap

SynchronousQueue

bounded

加锁

1. ArrayBlockingQueue

基于数组实现的有界阻塞安全线程队列。 构造函数

代码语言:javascript
复制
public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //初始化重入锁
        lock = new ReentrantLock(fair);
        //初始化读、写等待队列
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
       //初始化构造器
        this(capacity, fair);
        //获取重入锁
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
           //初始化元素中的个数
            count = i;
           //插入元素的下标索引
            putIndex = (i == capacity) ? 0 : i;
        } finally {
           //释放锁
            lock.unlock();
        }
    }

相关属性

代码语言:javascript
复制
final Object[] items;        //存放元素的数组
int takeIndex;                 //取元素的下标索引
int putIndex;                  //存元素的下标索引
int count;                      //数组中的元素个数
final ReentrantLock lock;   //数据读取的可重入锁
private final Condition notEmpty; //读等待的队列
private final Condition notFull;    //写等待的队列

核心函数 put

代码语言:javascript
复制
public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
       //如果当前线程没有中断,就将当前线程锁定
        lock.lockInterruptibly();
        try {
            //当前队列已经满了就一直等待
            while (count == items.length)
                notFull.await();
            //插入元素
            enqueue(e);
        } finally {
           //释放锁
            lock.unlock();
        }
    }

take

代码语言:javascript
复制
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

2. LinkedBlockingQueue

基于链表实现的阻塞队列 构造函数

代码语言:javascript
复制
public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

相关属性

代码语言:javascript
复制
private final int capacity;           //元素个数
private final AtomicInteger count = new AtomicInteger();  //
transient Node<E> head;             //头节点
private transient Node<E> last;     //尾节点
private final ReentrantLock takeLock = new ReentrantLock();  //读可重入锁
private final Condition notEmpty = takeLock.newCondition();  //读等待队列
private final ReentrantLock putLock = new ReentrantLock(); //写可重入锁
private final Condition notFull = putLock.newCondition();  //写队列

核心函数 put

代码语言:javascript
复制
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
       //获取写的可重入锁
        final ReentrantLock putLock = this.putLock;
        //线程安全的原子操作类
        final AtomicInteger count = this.count;
        //判断线程是否中断
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //如果加1后还小于当前容量,则唤醒一个等待的线程
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

3. DelayQueue

DelayQueue每次都是将元素加入排序队列,以delay/过期时间为排序因素,将快过期的元素放在队首,取数据的时候每次都是先取快过期的元素。 构造方法

代码语言:javascript
复制
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

相关属性

代码语言:javascript
复制
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();  //根据队列里某些元素排序的有序队列
private final Condition available = lock.newCondition();
private Thread leader = null;

核心函数 offer

代码语言:javascript
复制
public boolean offer(E e) {
        //获取可重入锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //将元素加入优先级队列中
            q.offer(e);
           //如果当前元素为队首,将leader=null,唤起其他线程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
           //释放锁
            lock.unlock();
        }
    }

take

代码语言:javascript
复制
public E take() throws InterruptedException {
       //获取可重入锁
        final ReentrantLock lock = this.lock;
        //判断当前线程是否中断,没有中断就将当前线程锁定
        lock.lockInterruptibly();
        try {
            //循环执行
            for (;;) {
                E first = q.peek();
                //如果队首为空,阻塞当前线程
                if (first == null)
                    available.await();
                else {
                    //获取当前元素过期时间
                    long delay = first.getDelay(NANOSECONDS);
                    //小于等于0 直接弹出
                    if (delay <= 0)
                        return q.poll();
                     //将first 只为null,避免内存泄漏
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        //阻塞当前线程
                        available.await();
                    else {
                        //将当前线程赋值给leader,然后阻塞delay时间,等待队首元素达到可出队时间
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                             //释放leader引用
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //如果leader元素为空,优先级队列不为空唤起其他线程
            if (leader == null && q.peek() != null)
                available.signal();
            //释放锁
            lock.unlock();
        }
    }

4. PriorityBlockingQueue

无界优先队列 构造函数

代码语言:javascript
复制
 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    }

相关属性

代码语言:javascript
复制
private static final int DEFAULT_INITIAL_CAPACITY = 11;   //默认容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大容量
private transient Object[] queue;   //存放元素数组
private transient int size;  //元素个数
private transient Comparator<? super E> comparator;  //比较器
private final ReentrantLock lock;  //可重入锁
private final Condition notEmpty;  //非空条件
private transient volatile int allocationSpinLock; //扩容时,CAS更新这个值谁更新成功谁执行
private PriorityQueue<E> q;//不阻塞的优先队列,用于序列化/反序列化

核心函数 offer

代码语言:javascript
复制
public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
         //判断是否需要扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            //根据是否有比较器选择不同方法
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            //唤醒notEmpty条件
            notEmpty.signal();
        } finally {
           //释放锁
            lock.unlock();
        }
        return true;
    }
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 取父节点
        int parent = (k - 1) >>> 1;
        // 父节点的元素值
        Object e = array[parent];
        // 如果key大于父节点,堆化结束
        if (key.compareTo((T) e) >= 0)
            break;
        // 否则,交换二者的位置,继续下一轮比较
        array[k] = e;
        k = parent;
    }
    // 找到了应该放的位置,放入元素
    array[k] = key;
}

take

代码语言:javascript
复制
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    E result;
    try {
        // 队列没有元素,就阻塞在notEmpty条件上
        // 出队成功,就跳出这个循环
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        // 解锁
        lock.unlock();
    }
    // 返回出队的元素
    return result;
}
private E dequeue() {
    // 元素个数减1
    int n = size - 1;
    if (n < 0)
        // 数组元素不足,返回null
        return null;
    else {
        Object[] array = queue;
        // 弹出堆顶元素
        E result = (E) array[0];
        // 把堆尾元素拿到堆顶
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 并做自上而下的堆化
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        // 修改size
        size = n;
        // 返回出队的元素
        return result;
    }
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        // 只需要遍历到叶子节点就够了
        while (k < half) {
            // 左子节点
            int child = (k << 1) + 1; // assume left child is least
            // 左子节点的值
            Object c = array[child];
            // 右子节点
            int right = child + 1;
            // 取左右子节点中最小的值
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // key如果比左右子节点都小,则堆化结束
            if (key.compareTo((T) c) <= 0)
                break;
            // 否则,交换key与左右子节点中最小的节点的位置
            array[k] = c;
            k = child;
        }
        // 找到了放元素的位置,放置元素
        array[k] = key;
    }
}

5. SynchronousQueue

双栈双队列算法,一个写SynchronousQueue需要和一个读SynchronousQueue组队出现 构造方法

代码语言:javascript
复制
public SynchronousQueue() {
        this(false);
    }
public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

相关属性

代码语言:javascript
复制
static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;

核心方法 put

代码语言:javascript
复制
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
  E transfer(E e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

take

代码语言:javascript
复制
 public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.09.23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 阻塞队列
    • BlockingQueue
      • 常见的BlockingQueue
        • 1. ArrayBlockingQueue
        • 2. LinkedBlockingQueue
        • 3. DelayQueue
        • 4. PriorityBlockingQueue
        • 5. SynchronousQueue
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档