前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java对阻塞队列的实现ArrayBlockingQueueLinkedBlockingQueue

Java对阻塞队列的实现ArrayBlockingQueueLinkedBlockingQueue

作者头像
呼延十
发布2019-07-01 17:12:35
7140
发布2019-07-01 17:12:35
举报
文章被收录于专栏:呼延呼延

什么是阻塞队列?

阻塞队列与队列基本一致,额外的支持阻塞添加和阻塞删除方法.

  • 阻塞添加: 当队列满时,线程不断尝试向其中添加,直到有其他线程取走元素,使添加操作成功,在此期间,线程阻塞.
  • 阻塞删除: 当队列为空时,线程不断尝试取出队头元素,直到有其他线程添加元素,使删除操作成功,在此期间,线程阻塞.

怎么实现阻塞呢?可以使用Java中Object类的wait(),notify(),notifyAll()等方法来实现.

  • 阻塞添加: 当队列满的时候,当前线程阻塞,当生产成功之后,唤醒消费者(此时队列中至少有一个元素).
  • 阻塞删除: 等队列为空的时候,当前线程阻塞,当消费成功后,唤醒生产者(此时队列中只有有一个空的位置可以用来添加元素).

更多的原理让注释体现吧!

下面的代码是一个简易版本的实现,仅仅实现了阻塞方法,对于队列常规的添加和移除方法没有实现:

代码语言:javascript
复制
import mian.AbstractMain;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by pfliu on 2019/04/28.
 */
public class BlockingQueueT extends AbstractMain {

    // 存放元素的linkedlist
    private LinkedList<Integer> items = new LinkedList<>();
    // 计数,使用AtomicInteger,防止冲突
    private AtomicInteger count = new AtomicInteger(0);

    //定义队列的最大值与最小值,也就是(满/空)的定义,当然这里可以用其他方式实现,比如用一个定长的数组.
    private final int max = 100;
    private final int min = 0;

    // 新建一个对象,用来充当锁的作用
    private final Object lock = new Object();

    public void put(Integer integer) throws InterruptedException {
        // 加锁
        synchronized (lock) {
            // 如果队列是满的,则当前线程不断的等待
            while (count.get() == max) {
                lock.wait();
            }
            // 添加元素,计数增加并且唤醒消费者
            items.add(integer);
            count.incrementAndGet();
            lock.notifyAll();
        }
    }

    public Integer pop() throws InterruptedException {
        // 加锁
        synchronized (lock) {
            // 如果队列是空的,则当前线程不断的等待
            while (count.get() == min) {
                lock.wait();
            }
            // 获取结果值,计数减少,唤醒消费者,返回结果
            Integer ret = items.getFirst();
            items.removeFirst();
            count.decrementAndGet();
            lock.notifyAll();
            return ret;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        new BlockingQueueT().parseArgsAndRun(args);
    }

    @Override
    public void run() throws InterruptedException {
        BlockingQueueT bt = new BlockingQueueT();

        // 生成这线程,生成1000个元素
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                logger.info("add : {}", i);
                try {
                    bt.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.setName("producer");
        producer.start();
        // 消费者线程,秩序的进行消费
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    logger.info("get : {}", bt.pop());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        consumer.setName("consumer");
        consumer.start();
    }
}

在main方法中,我们进行了一些测试,启动了一个生产者线程,不断的向阻塞队列中添加元素,同时启动了一个消费者线程,无限的从队列中读取.可以预期的是,在程序刚开始运行的时候,读写都会运行,而当生产者到1000之后停止,消费者会阻塞.

标准输出太多了不贴了,但是通过arthas可以看到当前的线程状态,可以看到消费者是出于wait状态的.

2019-04-28-17-58-38
2019-04-28-17-58-38

当然我们自己实现的这个考虑肯定不是很周全,那么就来看一下Java对阻塞队列的一些实现.

ArrayBlockingQueue

首先来看一下ArrayBlockingQueue,它是一个使用定长的数组来实现的有界的阻塞队列,和我们实现的基本类似,只是加锁使用ReentrantLock实现,且存储结构使用数组,需要记忆当前的添加位置以及弹出位置.队列中的顺序使用FIFO策略.

此外,当多个线程阻塞等待入队或者出队时候,ArrayBlockingQueue支持公平和非公平两种形式.

构造方法

由于是有界的阻塞队列,所以构造时都需要传入队列的大小.

ArrayblockingQueue有三个构造方法,如下:

代码语言:javascript
复制
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

可以发现,第一个和第三个构造方法都是对第二个的调用,而第二个构造方法中,初始化了存放元素的数组,以及用于实现阻塞机制的锁等.

插入方法

add(E)

如果队列不满则添加元素,如果队列满则抛出IllegalStateException异常.在阻塞队列中不建议使用.

代码语言:javascript
复制
    public boolean add(E e) {
        return super.add(e);
    }

        public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
offer(E)

如果队列不满,则添加元素,队列满则返回false.不抛异常.

代码语言:javascript
复制
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
offer(E,int,TimeUnit)

上一个offer方法的带有超时时间的版本,当队列满时,会尝试知道超时时间结束才返回false.

代码语言:javascript
复制
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
put

当队列满时,线程等待,知道可以放入元素再执行操作.

代码语言:javascript
复制
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

移除方法

poll

当队列为空时,返回null.不为空则返回队头元素.

代码语言:javascript
复制
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
poll(long,TimeUnit)

上一个poll方法的超时版本.当队列为空时,尝试获取元素,知道超时时间到达,返回null.

代码语言:javascript
复制
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
take

弹出元素的阻塞实现,当队列为空时,阻塞等待,知道可以获取到元素.

代码语言:javascript
复制
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
remove

循环删除某个元素.

代码语言:javascript
复制
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

其他方法

peek

返回队头的元素,但是该元素不出队.

代码语言:javascript
复制
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
size

返回当前队列中的元素数量.

代码语言:javascript
复制
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
remainingCapacity

返回当前队列中空闲的位置的数量.

代码语言:javascript
复制
    public int remainingCapacity() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    }

LinkedBlockingQueue

LinkedBlockingQueue的实现思路与ArrayBlockingQueue基本一致,只是将锁分为了取出锁插入锁.当插入和取出数据时,可以分开加锁,互不影响.且它可以是无界的.

ChangeLog

2019-04-28 完成

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com


本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是阻塞队列?
  • ArrayBlockingQueue
    • 构造方法
      • 插入方法
        • 移除方法
          • 其他方法
          • LinkedBlockingQueue
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档