前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《从Java面试题来看源码》-LinkedBlockingQueue 源码分析

《从Java面试题来看源码》-LinkedBlockingQueue 源码分析

作者头像
阿提说说
发布2022-12-02 16:46:47
3980
发布2022-12-02 16:46:47
举报
文章被收录于专栏:Java技术进阶Java技术进阶
在这里插入图片描述
在这里插入图片描述

源码基于open-jdk 11

关联文档:有哪些并发队列?及ConcurrentLinkedQueue 源码分析

有界阻塞队列,使用单向链表实现,通过ReentrantLock实现线程安全,阻塞通过Condition实现,出队和入队各一把锁,不存在互相竞争 ,一种经典的生产和消费模式场景

内部类

Node

节点对象

代码语言:javascript
复制
static class Node<E> {
    //值
    E item;

    /**
     * 表示有效的后续节点
     * null 表示最后一个节点
     */
    Node<E> next;

    Node(E x) { item = x; }
}

Itr

迭代器

代码语言:javascript
复制
private class Itr implements Iterator<E> {
    private Node<E> next;           //保存下一个节点
    private E nextItem;             // 保存下一个节点的item值
    private Node<E> lastRet;		//最近返回的项目节点
    private Node<E> ancestor;       //remove()中用于断开连接

    Itr() {
        //获取写锁和取锁,takeLock 和 putLock
        fullyLock();
        try {
            //从头节点开始,如果头节点的下一个节点存在
            //为什么不直接用head呢?因为head最开始是指向一个哨兵节点,item=next=null
            if ((next = head.next) != null)
                //就将下一个节点的item值保存
                nextItem = next.item;
        } finally {
            //释放写锁和取锁
            fullyUnlock();
        }
    }

    //通过next判断是否有下一个节点
    public boolean hasNext() {
        return next != null;
    }

    //获取下一个节点
    public E next() {
        Node<E> p;
        if ((p = next) == null)
            throw new NoSuchElementException();
        //p = next != null,将节点保存在lastRet中
        lastRet = p;
        E x = nextItem;
        //获取写锁和取锁
        fullyLock();
        try {
            E e = null;
            //获取有效的节点,直到节点的item值不为null
            for (p = p.next; p != null && (e = p.item) == null; )
                //获取下一个节点
                p = succ(p);
            next = p;
            nextItem = e;
        } finally {
            //释放锁
            fullyUnlock();
        }
        return x;
    }

    //循环对队列中的元素应用指定操作
    public void forEachRemaining(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        Node<E> p;
        if ((p = next) == null) return;
        lastRet = p;
        next = null;
        final int batchSize = 64;
        Object[] es = null;
        int n, len = 1;
        do {
            //获取写锁和读锁
            fullyLock();
            try {
                //这是时候进行初始化,获取数组长度,创建数组
                if (es == null) {
                    p = p.next;
                    for (Node<E> q = p; q != null; q = succ(q))
                        if (q.item != null && ++len == batchSize)
                            break;
                    es = new Object[len];
                    es[0] = nextItem;
                    nextItem = null;
                    n = 1;
                } else
                    n = 0;
                //开始循环赋值
                for (; p != null && n < len; p = succ(p))
                    if ((es[n] = p.item) != null) {
                        lastRet = p;
                        n++;
                    }
            } finally {
                //释放锁
                fullyUnlock();
            }
            //循环执行指定操作
            for (int i = 0; i < n; i++) {
                @SuppressWarnings("unchecked") E e = (E) es[i];
                action.accept(e);
            }
        } while (n > 0 && p != null);
    }

    public void remove() {
        Node<E> p = lastRet;
        if (p == null)
            throw new IllegalStateException();
        lastRet = null;
        fullyLock();
        try {
            if (p.item != null) {
                //第一次的时候从头部开始
                if (ancestor == null)
                    ancestor = head;
                //查找当前节点p的上一个节点,findPred通过循环比较的方式
                ancestor = findPred(p, ancestor);
                //断开连接
                unlink(p, ancestor);
            }
        } finally {
            //释放锁
            fullyUnlock();
        }
    }
}

LBQSpliterator

可拆分迭代器

代码语言:javascript
复制
private final class LBQSpliterator implements Spliterator<E> {
    static final int MAX_BATCH = 1 << 25;  //处理数组最大长度
    Node<E> current;    // 当前节点,初始化前为null
    int batch;          // 拆分处理的大小
    boolean exhausted;  // true表示没有更多的节点
    long est = size();  // 预估大小,size()获取的大小,并不准确

    LBQSpliterator() {}

    public long estimateSize() { return est; }

    //分割队列
    public Spliterator<E> trySplit() {
        Node<E> h;
        //下面的表达式true,表示还有元素
        if (!exhausted &&
            ((h = current) != null || (h = head.next) != null)
            && h.next != null) {
            //获取批量处理的最小值,每次执行trySplit,batch都+1
            int n = batch = Math.min(batch + 1, MAX_BATCH);
            Object[] a = new Object[n];
            int i = 0;
            Node<E> p = current;
            //获取读锁和写锁
            fullyLock();
            try {
                if (p != null || (p = head.next) != null)
                    //循环赋值到数组a,直到n个元素
                    for (; p != null && i < n; p = succ(p))
                        if ((a[i] = p.item) != null)
                            i++;
            } finally {
                fullyUnlock();
            }
            //表示没有元素
            if ((current = p) == null) {
                est = 0L;
                exhausted = true;
            }
            else if ((est -= i) < 0L)
                //迭代器大小-已拆分的长度 < 0,将est赋值为0,防止发生负数
                est = 0L;
            if (i > 0)
                //创建Spliterator,指定迭代器的特性
                return Spliterators.spliterator
                    (a, 0, i, (Spliterator.ORDERED |
                               Spliterator.NONNULL |
                               Spliterator.CONCURRENT));
        }
        return null;
    }

    //获取有效节点,并执行指定函数
    public boolean tryAdvance(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        //有元素才继续执行
        if (!exhausted) {
            E e = null;
            //获取写锁和读锁
            fullyLock();
            try {
                Node<E> p;
                //当前节点p不为null执行,如果为null获取头节点下一个节点,直到获取不为null的节点
                if ((p = current) != null || (p = head.next) != null)
                    do {
                        e = p.item;
                        p = succ(p);
                    } while (e == null && p != null);//获取一个节点值不为null的节点
                //这种情况说明,已经没有可获取的节点了
                if ((current = p) == null)
                    exhausted = true;
            } finally {
                //释放锁
                fullyUnlock();
            }
            if (e != null) {
                //应用函数
                action.accept(e);
                return true;
            }
        }
        return false;
    }

    //循环执行指定函数
    public void forEachRemaining(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        if (!exhausted) {
            exhausted = true;
            Node<E> p = current;
            current = null;
            //执行循环的方法
            forEachFrom(action, p);
        }
    }

    public int characteristics() {
        return (Spliterator.ORDERED |
                Spliterator.NONNULL |
                Spliterator.CONCURRENT);
    }
}

属性

代码语言:javascript
复制
/**队列大小,最大为Integer.MAX_VALUE */
private final int capacity;

/**队列当前节点数量*/
private final AtomicInteger count = new AtomicInteger();

/**
 * 头部节点
 * 不变性: head.item == null
 */
transient Node<E> head;

/**
 * 尾部节点
 * 不变变性: last.next == null
*/
private transient Node<E> last;

/**take,poll等要获取的锁*/
private final ReentrantLock takeLock = new ReentrantLock();

/**当队列为空时,出队操作的线程会阻塞放入该队列中等待*/
private final Condition notEmpty = takeLock.newCondition();

/**put,offer等要获取的锁*/
private final ReentrantLock putLock = new ReentrantLock();

/**当队列满时,入队操作的线程会放入该队列中等待*/
private final Condition notFull = putLock.newCondition();

构造函数

代码语言:javascript
复制
//无参构造函数,长度默认为最大长度
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//提供容量参数
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //构造一个哨兵节点
    last = head = new Node<E>(null);
}

//从给定的集合中创建队列
public LinkedBlockingQueue(Collection<? extends E> c) {
    //首先创建一个默认队列
    this(Integer.MAX_VALUE);
    //获取写锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); //虽然不会发生竞争,但保证可见性是必要的
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //入队,将元素加入队列末尾
            enqueue(new Node<E>(e));
            ++n;
        }
        //设置节点数量
        count.set(n);
    } finally {
        //释放写锁
        putLock.unlock();
    }
}

size、remainingCapacity

代码语言:javascript
复制
//获取队列长度
public int size() {
    return count.get();
}

//队列剩余容量
public int remainingCapacity() {
    return capacity - count.get();
}

put

在尾部插入指定元素

代码语言:javascript
复制
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final int c;
    final Node<E> node = new Node<E>(e);
    //获取写锁对象
    final ReentrantLock putLock = this.putLock;
    //获取队列当前数量
    final AtomicInteger count = this.count;
    //加锁
    putLock.lockInterruptibly();
    try {
        //如果队列数量 已经达到队列的最大容量,则将该线程放入等待队列
        while (count.get() == capacity) {
            notFull.await();
        }
        //队列未满,插入队列
        enqueue(node);
        //队列数量+1,返回原来的队列数量
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            //说明队列未满,唤醒原来阻塞等待插入的线程
            notFull.signal();
    } finally {
        //释放写锁
        putLock.unlock();
    }
    //c == 0 说明第一次插入
    if (c == 0)
        //唤醒原来阻塞等待出队的线程
        signalNotEmpty();
}

offer

在队列的尾部插入元素,该方法提供超时版本,非一直阻塞

代码语言:javascript
复制
//插队元素,超时版本
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
	//插入元素不能为null
    if (e == null) throw new NullPointerException();
    //将时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    final int c;
    //写锁
    final ReentrantLock putLock = this.putLock;
    //队列数量
    final AtomicInteger count = this.count;
    //加锁,可以被中断
    putLock.lockInterruptibly();
    try {
        //如果队列满了
        while (count.get() == capacity) {
            if (nanos <= 0L)
                return false;
            //将该线程阻塞等待指定时间
            nanos = notFull.awaitNanos(nanos);
        }
        //入队
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        //表示没满,唤醒原来入队阻塞等待的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    if (c == 0)
        //至少有一个元素,队列不为空,唤醒原来出队阻塞的线程
        signalNotEmpty();
    return true;
}

//入队,不带超时
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //队列满了,不等待,直接返回false
    if (count.get() == capacity)
        return false;
    final int c;
    final Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //再次检查是否已满
        if (count.get() == capacity)
            return false;
        //入队
        enqueue(node);
        c = count.getAndIncrement();
        //为true说明队列未满,还可以继续插入
        if (c + 1 < capacity)
            //唤醒原来阻塞等待的入队线程
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

take

取元素,阻塞版本,直到有元素为止

代码语言:javascript
复制
public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //没有元素,将线程放入出队列表中
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出队
        x = dequeue();
        //队列数量减一,返回原队列数量
        c = count.getAndDecrement();
        //为true 说明队列还有元素,唤醒之前阻塞的出队线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //为true说明之前队列是满的,现在已经至少有一个空闲位置了
    if (c == capacity)
        //唤醒之前阻塞的入队线程
        signalNotFull();
    return x;
}

poll、peek

pool 弹出元素,不阻塞线程,或者阻塞有超时时间

peek 只获取队列的头部元素,不阻塞线程

代码语言:javascript
复制
//弹出元素的超时版本
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    final E x;
    final int c;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //可以被中断
    takeLock.lockInterruptibly();
    try {
        //为true说明队列是空的
        while (count.get() == 0) {
            if (nanos <= 0L)
                return null;
            //阻塞等待nanos,直到超时
            nanos = notEmpty.awaitNanos(nanos);
        }
        //出队
        x = dequeue();
        //队列数量减一,返回原来的数量
        c = count.getAndDecrement();
        //为true说明队列还有元素
        if (c > 1)
            //唤醒原来阻塞出队的线程
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //为true,说明现在已经有空闲的位置
    if (c == capacity)
        //唤醒原来阻塞的入队线程
        signalNotFull();
    return x;
}

//弹出元素
public E poll() {
    final AtomicInteger count = this.count;
    //队列为空,直接返回null
    if (count.get() == 0)
        return null;
    final E x;
    final int c;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //再次检查队列是否为空
        if (count.get() == 0)
            return null;
        //出队
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
代码语言:javascript
复制
//获取头部元素
public E peek() {
    final AtomicInteger count = this.count;
    //队列是否为空
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //队列不为空,返回头部元素,head.next,因为还有一个item=null的哨兵节点,比如poll形成的
        return (count.get() > 0) ? head.next.item : null;
    } finally {
        takeLock.unlock();
    }
}

enqueue

代码语言:javascript
复制
//链接到末尾节点
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

dequeue

代码语言:javascript
复制
//出队,head元素的item始终等于null
private E dequeue() {
    Node<E> h = head; //①
    Node<E> first = h.next; //②
    //变为自引用,等GC回收
    h.next = h; //③  
    head = first; //④
    E x = first.item; //⑤
    first.item = null; //⑥
    return x;
}

remove

代码语言:javascript
复制
//删除指定元素
public boolean remove(Object o) {
    if (o == null) return false;
    //写锁,读锁都加锁
    fullyLock();
    try {
        for (Node<E> pred = head, p = pred.next;
             p != null;
             pred = p, p = p.next) {
            //通过equlas比较元素是否相同
            if (o.equals(p.item)) {
                //断开连接
                unlink(p, pred);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

contains

代码语言:javascript
复制
//循环判断判断队列是否包含指定元素
public boolean contains(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> p = head.next; p != null; p = p.next)
            if (o.equals(p.item))
                return true;
        return false;
    } finally {
        fullyUnlock();
    }
}

toArray

将队列输出为数组

代码语言:javascript
复制
public Object[] toArray() {
    fullyLock();
    try {
        int size = count.get();
        Object[] a = new Object[size];
        int k = 0;
        for (Node<E> p = head.next; p != null; p = p.next)
            a[k++] = p.item;
        return a;
    } finally {
        fullyUnlock();
    }
}

//将队列输出到指定类型的数组
public <T> T[] toArray(T[] a) {
    fullyLock();
    try {
        int size = count.get();
        //如果给定的数组长度小于当前队列的长度,需要扩容
        //这里的做法是,通过反射生成一个新的数组
        if (a.length < size)
            a = (T[])java.lang.reflect.Array.newInstance
            (a.getClass().getComponentType(), size);

        int k = 0;
        for (Node<E> p = head.next; p != null; p = p.next)
            a[k++] = (T)p.item;
        if (a.length > k)
            a[k] = null;
        return a;
    } finally {
        fullyUnlock();
    }
}

clear

将队列中的元素清除

代码语言:javascript
复制
public void clear() {
    fullyLock();
    try {
        for (Node<E> p, h = head; (p = h.next) != null; h = p) {
            h.next = h;
            p.item = null;
        }
        head = last;
        //为true说明之前队列是满的,有可能已经有入队线程阻塞,需要唤醒
        if (count.getAndSet(0) == capacity)
            notFull.signal();
    } finally {
        fullyUnlock();
    }
}

toString

输出字符串,这里使用了Helpers类,来输出字符串,与Java8不同。

Helpers类用于并发包输出字符串,该类只在输出数组的时候获取锁,而不是在toString中获取锁

代码语言:javascript
复制
public String toString() {
    //
    return Helpers.collectionToString(this);
}

drainTo

从队列获取指定数量的元素并发送到指定集合,删除队列中原有元素

代码语言:javascript
复制
public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //防止超过范围
        int n = Math.min(maxElements, count.get());
        // count.get provides visibility to first n Nodes
        Node<E> h = head;
        int i = 0;
        try {
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                //将p节点转换为哨兵节点
                p.item = null;
                //将h节点置为自引用,等待GC回收
                h.next = h;
                //将p节点做为头节点,继续下次循环
                h = p;
                ++i;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                // assert h.item == null;
                head = h;
                //如果之前队列是满的,并且已经发送了一部分元素,那么需要唤醒之前阻塞的入队线程
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            //唤醒阻塞的入队线程
            signalNotFull();
    }
}

succ

获取下一个节点

代码语言:javascript
复制
Node<E> succ(Node<E> p) {
    //先比较p.next节点是否指向自己p,如果true,则从头节点开始
    //如果false返回p.next
    //这种写法是不是很巧妙,能够少定义变量,代码也变简洁
    if (p == (p = p.next))
        p = head.next;
    return p;
}

iterator

获取迭代器

代码语言:javascript
复制
public Iterator<E> iterator() {
    return new Itr();
}

spliterator

获取可拆分迭代器

代码语言:javascript
复制
public Spliterator<E> spliterator() {
    return new LBQSpliterator();
}

forEach

循环执行指定函数

代码语言:javascript
复制
public void forEach(Consumer<? super E> action) {
    Objects.requireNonNull(action);
    forEachFrom(action, null);
}

forEachFrom

代码语言:javascript
复制
void forEachFrom(Consumer<? super E> action, Node<E> p) {
    // Extract batches of elements while holding the lock; then
    // run the action on the elements while not
    final int batchSize = 64;       // max number of elements per batch
    Object[] es = null;             // container for batch of elements
    int n, len = 0;
    do {
        fullyLock();
        try {
            if (es == null) {
                if (p == null) p = head.next;
                //先获取所需数组的长度,并创建数组
                for (Node<E> q = p; q != null; q = succ(q))
                    if (q.item != null && ++len == batchSize)
                        break;
                es = new Object[len];
            }
            //将队列节点的值赋值给数组
            for (n = 0; p != null && n < len; p = succ(p))
                if ((es[n] = p.item) != null)
                    n++;
        } finally {
            //释放锁
            fullyUnlock();
        }
        //给每个元素执行指定函数
        for (int i = 0; i < n; i++) {
            @SuppressWarnings("unchecked") E e = (E) es[i];
            action.accept(e);
        }
    } while (n > 0 && p != null); //这里有必要用循环吗?
}

removeIf

根据提供的过滤函数,从队列中移除元素

代码语言:javascript
复制
public boolean removeIf(Predicate<? super E> filter) {
    Objects.requireNonNull(filter);
    return bulkRemove(filter);
}

removeAll

从队列中移除包含在指定集合中的元素

代码语言:javascript
复制
public boolean removeAll(Collection<?> c) {
    Objects.requireNonNull(c);
    return bulkRemove(e -> c.contains(e));
}

retainAll

从队列中移除不包含在指定集合中的元素,也就是队列中只包含集合中有的元素

代码语言:javascript
复制
public boolean retainAll(Collection<?> c) {
    Objects.requireNonNull(c);
    return bulkRemove(e -> !c.contains(e));
}

fullyLock、fullyUnlock

代码语言:javascript
复制
/**
 * 先获取写锁,在获取读锁
 */
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

/**
 * 以加锁反方向,释放锁,先释放读锁,再释放写锁
 */
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

findPred

查找当前节点的上一个节点

代码语言:javascript
复制
Node<E> findPred(Node<E> p, Node<E> ancestor) {
    //如果ancestor为空,从头节点开始
    if (ancestor.item == null)
        ancestor = head;
    //直到找到与当前节点p相同,返回上一个节点
    for (Node<E> q; (q = ancestor.next) != p; )
        ancestor = q;
    return ancestor;
}

signalNotEmpty

代码语言:javascript
复制
//唤醒原来阻塞等待出队的线程,该方法在put/offer中调用
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

signalNotFull

代码语言:javascript
复制
//唤醒原来阻塞等待入队的线程
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

bulkRemove

根据指定的过滤函数元素

代码语言:javascript
复制
private boolean bulkRemove(Predicate<? super E> filter) {
    boolean removed = false;
    Node<E> p = null, ancestor = head;
    Node<E>[] nodes = null;
    int n, len = 0;
    do {
        // 1. 加锁,并创建长度为64的节点数组
        fullyLock();
        try {
            if (nodes == null) {  // first batch; initialize
                p = head.next;
                for (Node<E> q = p; q != null; q = succ(q))
                    if (q.item != null && ++len == 64)
                        break;
                nodes = (Node<E>[]) new Node<?>[len];
            }
            //赋值节点到数组
            for (n = 0; p != null && n < len; p = succ(p))
                nodes[n++] = p;
        } finally {
            fullyUnlock();
        }

        // 2. 在不加锁的情况下,执行过滤函数
        long deathRow = 0L;       // "bitset" of size 64
        for (int i = 0; i < n; i++) {
            final E e;
            if ((e = nodes[i].item) != null && filter.test(e))
                deathRow |= 1L << i;
        }

        // 3. 加锁,移除元素
        if (deathRow != 0) {
            fullyLock();
            try {
                for (int i = 0; i < n; i++) {
                    final Node<E> q;
                    if ((deathRow & (1L << i)) != 0L
                        && (q = nodes[i]).item != null) {
                        ancestor = findPred(q, ancestor);
                        unlink(q, ancestor);
                        removed = true;
                    }
                    nodes[i] = null; // help GC
                }
            } finally {
                fullyUnlock();
            }
        }
    } while (n > 0 && p != null);
    return removed;
}

writeObject、readObject

writeObject 将队列元素输出到指定的输出流

readObject 从指定的输入流中读取插入队列

代码语言:javascript
复制
private void writeObject(java.io.ObjectOutputStream s)
    throws java.io.IOException {

    fullyLock();
    try {
        // Write out any hidden stuff, plus capacity
        s.defaultWriteObject();

        // Write out all elements in the proper order.
        for (Node<E> p = head.next; p != null; p = p.next)
            s.writeObject(p.item);

        // Use trailing null as sentinel
        s.writeObject(null);
    } finally {
        fullyUnlock();
    }
}

private void readObject(java.io.ObjectInputStream s)
    throws java.io.IOException, ClassNotFoundException {
    // Read in capacity, and any hidden stuff
    s.defaultReadObject();

    count.set(0);
    last = head = new Node<E>(null);

    // Read in all elements and place in queue
    for (;;) {
        @SuppressWarnings("unchecked")
        E item = (E)s.readObject();
        if (item == null)
            break;
        add(item);
    }
}

unlink

断开连接

代码语言:javascript
复制
//p为当前节点,pred为上一个节点
void unlink(Node<E> p, Node<E> pred) {
    p.item = null;
    // 将当前节点p的下一个节点赋值给上一个节点pred的next上
    pred.next = p.next;
    //如果当前节点为最后一个节点
    if (last == p)
        //将上一个节点赋值给last,因为当前节点将被删除
        last = pred;
    //count获取数量,并减一
    //判断之前是否是满的,因为删除了一个后,有空闲,唤醒原来在notFull阻塞的线程
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

默认线程池阻塞队列为什么用LinkedBlockingQueue

不管是Executors提供的几种线程池,还是Spring提供的线程池,你会发现阻塞队列用的都是LinkedBlockingQueue,而不是用的ArrayBlockingQueue。

源码基于Java8

LinkedBlockingQueue

使用单链表实现,提供3种构造函数

  1. LinkedBlockingQueue() 无参构造函数,链表长度为Integer.MAX_VALUE
  2. LinkedBlockingQueue(int capacity) 指定capacity长度
  3. LinkedBlockingQueue(Collection<? extends E> c) 不指定长度,即默认长度为Integer.MAX_VALUE,提供初始化元素

链表节点由Node对象组成,每个Node有item变量用于存储元素,next变量指向下一个节点

执行put的时候,将元素放到链表尾部节点;take的时候从头部取元素

两种操作分别有一个锁putLock, takeLock,互不影响,可以同时进行

代码语言:javascript
复制
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

//put
 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
     //...
 }

//take
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
    //...
}
ArrayBlockingQueue

使用数组实现,3种构造函数

  1. ArrayBlockingQueue(int capacity) 指定长度
  2. ArrayBlockingQueue(int capacity, boolean fair) 指定长度,及指定是否使用FIFO顺序进出队列
  3. ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) 指定长度,进行队列顺序,初始元素

从构造函数看出,ArrayBlockingQueue必须指定初始化长度,如果线程池使用该队列,指定长度大了浪费内存,长度小队列并发性不高,在数组满的时候,put操作只能阻塞等待,或者返回false

ArrayBlockingQueue 只定义了一个Lock,put和take使用同一锁,不能同时进行

代码语言:javascript
复制
/** Main lock guarding all access */
    final ReentrantLock lock;
总结
  1. LinkedBlockingQueue 无须指定长度,放入和取出元素使用不同的锁,互不影响,效率高,通用性强
  2. ArrayBlockingQueue 必须指定长度,大了浪费内存,小了性能不高,使用同一把锁,效率低
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-06-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 内部类
    • Node
      • Itr
        • LBQSpliterator
          • LinkedBlockingQueue
          • ArrayBlockingQueue
          • 总结
      • 属性
      • 构造函数
      • size、remainingCapacity
      • put
      • offer
      • take
      • poll、peek
      • enqueue
      • dequeue
      • remove
      • contains
      • toArray
      • clear
      • toString
      • drainTo
      • succ
      • iterator
      • spliterator
      • forEach
      • forEachFrom
      • removeIf
      • removeAll
      • retainAll
      • fullyLock、fullyUnlock
      • findPred
      • signalNotEmpty
      • signalNotFull
      • bulkRemove
      • writeObject、readObject
      • unlink
      • 默认线程池阻塞队列为什么用LinkedBlockingQueue
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档