前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 生产者消费者实现 —— BlockingQueue

Java 生产者消费者实现 —— BlockingQueue

作者头像
Yano_nankai
发布2018-10-08 10:38:36
8600
发布2018-10-08 10:38:36
举报
文章被收录于专栏:二进制文集二进制文集

前言

对着《Java 编程思想》,通过wait - notifyAll实现了生产者消费者模式。今天用BlockingQueue实现一下。

BlockingQueue

简单实现

生产者和消费者,共用一个BlockingQueue。为什么BlockingQueue能够实现生产者-消费者模型呢?对于puttake两个操作,注释如下:

代码语言:javascript
复制
/**
 * Inserts the specified element into this queue, waiting if necessary
 * for space to become available.
 *
 * @param e the element to add
 * @throws InterruptedException if interrupted while waiting
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
void put(E e) throws InterruptedException;
代码语言:javascript
复制
/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element becomes available.
 *
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
E take() throws InterruptedException;

Apple.java,生产和消费的对象。

代码语言:javascript
复制
public class Apple {
    
    private int id;
    
    public Apple(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Apple [id=" + id + "]";
    }
}

生产者:

代码语言:javascript
复制
public class Producer {
    BlockingQueue<Apple> queue;
    
    public Producer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public boolean put(Apple apple) {
        return queue.offer(apple);
    }
}

消费者:

代码语言:javascript
复制
public class Consumer {
    BlockingQueue<Apple> queue;
    
    public Consumer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public Apple take() throws InterruptedException {
        return queue.take();
    }
}

测试:

代码语言:javascript
复制
public class TestConsumer {
    
    public static void main(String[] args) {

        final BlockingQueue<Apple> queue = new LinkedBlockingDeque<Apple>(100);
        
        // 生产者
        new Thread(new Runnable() {
            
            int appleId = 0;
            Producer producer = new Producer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        TimeUnit.SECONDS.sleep(1);
                        producer.put(new Apple(appleId++)); 
                        producer.put(new Apple(appleId++)); 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消费者
        new Thread(new Runnable() {
            Consumer consumer = new Consumer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        System.out.println(consumer.take().getId());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出:

生产者生产2个Apple,消费者立即消费掉。

改进

上述代码存在一些问题:

  • 生产者和消费者,都仅用于特定的类型Apple
  • 在使用过程中,需要自己定义BlockingQueue,自行实现生产者和消费者的线程,使用复杂
  • 如果要定义多个消费者线程,需要多次手动编写代码
  • 生产者并没有专注自身的功能:存储要消费的对象
  • 消费者并没有专注自身的功能:取出对象、如何消费对象

改进后的代码如下:

Apple类未更改。

Producer变为抽象类,并使用泛型。里面新增线程池,用于运行消费者线程。

代码语言:javascript
复制
public abstract class Producer<E> {
    protected BlockingQueue<E> queue;
    protected ExecutorService threadPool = Executors.newCachedThreadPool();
    public static final int DEFAULT_QUEUE_LENGTH = 10000;
    
    public Producer(int capacity) {
        initQueue(capacity);
    }
    
    public BlockingQueue<E> getQueue() {
        return queue;
    }

    public void setQueue(BlockingQueue<E> queue) {
        this.queue = queue;
    }

    public boolean put(E apple) {
        return queue.offer(apple);
    }
    
    private void initQueue(int capacity) {
        if (queue == null) {
            synchronized (this) {
                if (queue == null) {
                    queue = new LinkedBlockingDeque<E>(capacity < 0 ? DEFAULT_QUEUE_LENGTH : capacity);
                }
            }
        }
    }
    
    protected void consumerThread(int consumerCount, Consumer<E> consumer) {
        for (int i = 0; i < consumerCount; i++) {
            threadPool.execute(consumer);
        }
    }
}

Consumer也变成抽象类,使用泛型,并实现了Runnable接口。其中run方法的实现逻辑是:从阻塞队列中取出一个对象,并调用抽象方法consume。该方法是具体的消费者实现的消费逻辑。

代码语言:javascript
复制
public abstract class Consumer<E> implements Runnable{
    BlockingQueue<E> queue;
    
    /**
     * 数据逐个处理
     * @param data
     */
    protected abstract void consume(E data);
    
    @Override
    public void run() {
        while (true) {
            try {
                E data = take();
                try {
                    consume(data);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public Consumer(BlockingQueue<E> queue) {
        this.queue = queue;
    }
    
    public E take() throws InterruptedException {
        return queue.take();
    }
}

AppleProducer:Apple的生产者,使用非延迟加载的单例模式,指定阻塞队列的长度、消费者线程数量。

代码语言:javascript
复制
public class AppleProducer extends Producer<Apple>{
    
    // 并没有延迟加载
    public static AppleProducer INSTANCE = new AppleProducer(DEFAULT_QUEUE_LENGTH, 1); 

    private AppleProducer(int capacity, int consumerCount) {
        super(capacity);
        AppleConsumer consumer = new AppleConsumer(queue);
        consumerThread(consumerCount, consumer);
    }
}

AppleConsumer:Apple的消费者,要实现具体的消费方法consume。这里只是在控制台输出对象信息。

代码语言:javascript
复制
public class AppleConsumer extends Consumer<Apple>{

    public AppleConsumer(BlockingQueue<Apple> queue) {
        super(queue);
    }

    @Override
    protected void consume(Apple data) {
        System.out.println(data);
    }
}

测试:这里只需要获取AppleProducer,调用put方法添加对象即可!在队列中有对象Apple时,会有线程取出Apple,自动调用AppleConsumer的consume方法。

代码语言:javascript
复制
public class TestConsumer {
    
    public static void main(String[] args) throws InterruptedException {

        AppleProducer producer = AppleProducer.INSTANCE;
        for (int i = 0; i < 60; i++) {
            producer.put(new Apple(i));
        }
    }
}

有待改进的地方

  • 并没有面向接口编程,仍然是通过继承来实现的,代码有耦合(但是也不能算是缺点吧)
  • 阻塞队列直接使用LinkedBlockingDeque,并不够灵活(PriorityBlockingQueue等)
  • 对于线程,并没有好的名字,调试等并不直观
  • 如果有多个生产者-消费者,例如增加了Banana,管理仍然不够直观。可以增加一个方法,能够打印出所有的生产者-消费者
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.09.01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • BlockingQueue
    • 简单实现
      • 改进
        • 有待改进的地方
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档