Java 生产者消费者实现 —— BlockingQueue

前言

对着《Java 编程思想》,通过wait - notifyAll实现了生产者消费者模式。今天用BlockingQueue实现一下。

BlockingQueue

简单实现

生产者和消费者,共用一个BlockingQueue。为什么BlockingQueue能够实现生产者-消费者模型呢?对于puttake两个操作,注释如下:

/**
 * Inserts the specified element into this queue, waiting if necessary
 * for space to become available.
 *
 * @param e the element to add
 * @throws InterruptedException if interrupted while waiting
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
void put(E e) throws InterruptedException;
/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element becomes available.
 *
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
E take() throws InterruptedException;

Apple.java,生产和消费的对象。

public class Apple {
    
    private int id;
    
    public Apple(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Apple [id=" + id + "]";
    }
}

生产者:

public class Producer {
    BlockingQueue<Apple> queue;
    
    public Producer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public boolean put(Apple apple) {
        return queue.offer(apple);
    }
}

消费者:

public class Consumer {
    BlockingQueue<Apple> queue;
    
    public Consumer(BlockingQueue<Apple> queue) {
        this.queue = queue;
    }
    
    public Apple take() throws InterruptedException {
        return queue.take();
    }
}

测试:

public class TestConsumer {
    
    public static void main(String[] args) {

        final BlockingQueue<Apple> queue = new LinkedBlockingDeque<Apple>(100);
        
        // 生产者
        new Thread(new Runnable() {
            
            int appleId = 0;
            Producer producer = new Producer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        TimeUnit.SECONDS.sleep(1);
                        producer.put(new Apple(appleId++)); 
                        producer.put(new Apple(appleId++)); 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 消费者
        new Thread(new Runnable() {
            Consumer consumer = new Consumer(queue);
            
            @Override
            public void run() {
                try {
                    while (true) {
                        System.out.println(consumer.take().getId());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出:

生产者生产2个Apple,消费者立即消费掉。

改进

上述代码存在一些问题:

  • 生产者和消费者,都仅用于特定的类型Apple
  • 在使用过程中,需要自己定义BlockingQueue,自行实现生产者和消费者的线程,使用复杂
  • 如果要定义多个消费者线程,需要多次手动编写代码
  • 生产者并没有专注自身的功能:存储要消费的对象
  • 消费者并没有专注自身的功能:取出对象、如何消费对象

改进后的代码如下:

Apple类未更改。

Producer变为抽象类,并使用泛型。里面新增线程池,用于运行消费者线程。

public abstract class Producer<E> {
    protected BlockingQueue<E> queue;
    protected ExecutorService threadPool = Executors.newCachedThreadPool();
    public static final int DEFAULT_QUEUE_LENGTH = 10000;
    
    public Producer(int capacity) {
        initQueue(capacity);
    }
    
    public BlockingQueue<E> getQueue() {
        return queue;
    }

    public void setQueue(BlockingQueue<E> queue) {
        this.queue = queue;
    }

    public boolean put(E apple) {
        return queue.offer(apple);
    }
    
    private void initQueue(int capacity) {
        if (queue == null) {
            synchronized (this) {
                if (queue == null) {
                    queue = new LinkedBlockingDeque<E>(capacity < 0 ? DEFAULT_QUEUE_LENGTH : capacity);
                }
            }
        }
    }
    
    protected void consumerThread(int consumerCount, Consumer<E> consumer) {
        for (int i = 0; i < consumerCount; i++) {
            threadPool.execute(consumer);
        }
    }
}

Consumer也变成抽象类,使用泛型,并实现了Runnable接口。其中run方法的实现逻辑是:从阻塞队列中取出一个对象,并调用抽象方法consume。该方法是具体的消费者实现的消费逻辑。

public abstract class Consumer<E> implements Runnable{
    BlockingQueue<E> queue;
    
    /**
     * 数据逐个处理
     * @param data
     */
    protected abstract void consume(E data);
    
    @Override
    public void run() {
        while (true) {
            try {
                E data = take();
                try {
                    consume(data);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public Consumer(BlockingQueue<E> queue) {
        this.queue = queue;
    }
    
    public E take() throws InterruptedException {
        return queue.take();
    }
}

AppleProducer:Apple的生产者,使用非延迟加载的单例模式,指定阻塞队列的长度、消费者线程数量。

public class AppleProducer extends Producer<Apple>{
    
    // 并没有延迟加载
    public static AppleProducer INSTANCE = new AppleProducer(DEFAULT_QUEUE_LENGTH, 1); 

    private AppleProducer(int capacity, int consumerCount) {
        super(capacity);
        AppleConsumer consumer = new AppleConsumer(queue);
        consumerThread(consumerCount, consumer);
    }
}

AppleConsumer:Apple的消费者,要实现具体的消费方法consume。这里只是在控制台输出对象信息。

public class AppleConsumer extends Consumer<Apple>{

    public AppleConsumer(BlockingQueue<Apple> queue) {
        super(queue);
    }

    @Override
    protected void consume(Apple data) {
        System.out.println(data);
    }
}

测试:这里只需要获取AppleProducer,调用put方法添加对象即可!在队列中有对象Apple时,会有线程取出Apple,自动调用AppleConsumer的consume方法。

public class TestConsumer {
    
    public static void main(String[] args) throws InterruptedException {

        AppleProducer producer = AppleProducer.INSTANCE;
        for (int i = 0; i < 60; i++) {
            producer.put(new Apple(i));
        }
    }
}

有待改进的地方

  • 并没有面向接口编程,仍然是通过继承来实现的,代码有耦合(但是也不能算是缺点吧)
  • 阻塞队列直接使用LinkedBlockingDeque,并不够灵活(PriorityBlockingQueue等)
  • 对于线程,并没有好的名字,调试等并不直观
  • 如果有多个生产者-消费者,例如增加了Banana,管理仍然不够直观。可以增加一个方法,能够打印出所有的生产者-消费者

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Ryan Miao

java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和p...

32070
来自专栏JavaQ

concurrent包下线程池类小结

并发包下的线程池技术虽然常用,但是知识点较多易忘。所以,参考网络资源做了一个小结,便于复习。 Executor接口 用于执行已提交的Runnable任务。 ? ...

33840
来自专栏老马说编程

(81) 并发同步协作工具 / 计算机程序的思维逻辑

查看历史文章,请点击上方链接关注公众号。 我们在67节和68节实现了线程的一些基本协作机制,那是利用基本的wait/notify实现的,我们提到,Java并发包...

20890
来自专栏编码小白

ofbiz实体引擎(三) GenericDelegator实例化的具体过程

/** * @author 郑小康 * 1.设置delegatorFullName 基本delegatorName+"#"+tenantId...

43150
来自专栏IT技术精选文摘

Java多线程知识小抄集(二)

27. ConcurrentHashMap ConcurrentHashMap是线程安全的HashMap,内部采用分段锁来实现,默认初始容量为16,装载因子为0...

20760
来自专栏JavaQ

高并发编程-CyclicBarrier深入解析

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法...

1.2K30
来自专栏Ryan Miao

java并发编程(4)--线程池的使用

转载:http://www.cnblogs.com/dolphin0520/p/3932921.html 一. java中的ThreadPoolExecutor...

36480
来自专栏芋道源码1024

熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. HystrixThreadPoolProperties 3. HystrixThreadPoo...

50060
来自专栏每日一篇技术文章

Swift3.0 - 对象判等

14420
来自专栏DOTNET

.NET MongoDB Driver GridFS 2.2原理及使用示例

一、API解读 1 GridFSBucketOptions 1)public string BucketName { get; set; } 获取或设置buck...

35780

扫码关注云+社区

领取腾讯云代金券