Java实现生产者消费者的两种方式(r12笔记第66天)

我在8年前去面试程序员的时候,一个不大的公司,里面的开发主管接待了我们,给我的题目就是写一段程序模拟生产者消费者问题,当时可把我难坏了,一下子感觉自己的知识储备竟然如此的匮乏。

而在我从事DBA工作之后,经常会有大批量并发的环境,有的需要排障,有的需要优化,在很多并发的场景中,发现生产者消费者问题可以模拟出很多实际中的问题,所以生产者消费者问题非常重要,也是我想不断改进和探索的一类问题。

引入仓库的必要性

要想使用程序来模拟,其实也不用花太多的时间,我们简单说说需要考虑的地方。首先生产者,消费者是两个实体对象,生产者生产物品,消费者消费物品,如果在生产者中定义生产的流程,在消费者中定义消费的流程,两个对象就需要彼此引用,依赖性太高,而且实际上性能也好不到哪里去,所以就需要一个缓冲器,一个中间对象,我们就叫做仓库吧,生产的物品推入仓库,消费的物品从仓库中取出,这样生产者和消费者就能够取消之间的引用,直接通过仓库引用来同步状态,降低耦合。

所以我们的一个初步设想就是生产者-->仓库<--消费者 这样的模式。

生产者消费者的几种类型和实现方式

当然生产者消费者问题有两种类型,一种就是使用某种机制来保护生产者和消费者之间的同步,另外一种和Linux中的管道思路相似。相对来说第一种类型的处理方式更为通用,大体分为三类具体的实现方式:

  1. 经典的wait(),notify()方法
  2. await(),signal()方法
  3. 使用阻塞队列(BlockingQueue),比如LinkedBlockingQueue

通用的对象类

为了更快出成果,尽可能快速理解,我也参考了一些资料,下午下班前写了下面的程序。我就简单说明第一种和第二种吧。

因为实体类对象是通用的,我就不再重复列出了,有生产者Producer和消费者Consumer两个类。

生产者类

import com.jeanron.test1.Storage;

public class Producer extends Thread {
    // 每次生产的产品数量
    private int num;

    // 所在放置的仓库
    private Storage storage;

    // 构造函数,设置仓库
    public Producer(Storage storage) {
        this.storage = storage;
    }

    public Producer(Storage storage, int num) {
        this.storage = storage;
        this.num = num;
    }

    // 线程run函数
    public void run() {
        produce(num);
    }

    // 调用仓库Storage的生产函数
    public void produce(int num) {
        storage.produce(num);
    }

    // get/set方法
    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    public Storage getStorage() {
        return storage;
    }

    public void setStorage(Storage storage) {
        this.storage = storage;
    }
}

消费者类

import com.jeanron.test1.Storage;
 
public class Consumer extends Thread  
{  
    // 每次消费的产品数量  
    private int num;  
 
    // 所在放置的仓库  
    private Storage storage;  
 
    // 构造函数,设置仓库  
    public Consumer(Storage storage)  
    {  
        this.storage = storage;  
    }  
 
    // 构造函数,设置仓库  
    public Consumer(Storage storage,int num)  
    {  
        this.storage = storage;  
        this.num = num;
    }  
 
    // 线程run函数  
    public void run()  
    {  
        consume(num);  
    }  
 
    // 调用仓库Storage的生产函数  
    public void consume(int num)  
    {  
        storage.consume(num);  
    }  
 
    // get/set方法  
    public int getNum()  
    {  
        return num;  
    }  
 
    public void setNum(int num)  
    {  
        this.num = num;  
    }  
 
    public Storage getStorage()  
    {  
        return storage;  
    }  
 
    public void setStorage(Storage storage)  
    {  
        this.storage = storage;  
    }  
}  

第一种实现方式

import java.util.LinkedList;

public class Storage  
{  
    // 仓库最大存储量  
    private final int MAX_SIZE = 200;  
 
    // 仓库存储的载体  
    private LinkedList<Object> list = new LinkedList<Object>();  
    private static Storage storage=null;  
    public static Storage getInstance() {  
        if (storage == null) {    
            synchronized (Storage.class) {    
               if (storage == null) {    
                   storage = new Storage();   
               }    
            }    
        }    
        return storage;   
    }   
 
    // 生产num个产品  
    public void produce(int num)  
    {  
        // 同步代码段  
        synchronized (list)  
        {  
            // 如果仓库剩余容量不足  
            while (list.size() + num > MAX_SIZE)  
            {  
                System.out.println(Thread.currentThread().getName()+"【生产者:要生产的产品数量】:" + num + "\t【库存量】:"  
                        + list.size() + "\t暂时不能执行生产任务!");  
                try  
                {  
                    // 由于条件不满足,生产阻塞  
                    list.wait();  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
 
            // 生产条件满足情况下,生产num个产品  
            for (int i = 1; i <= num; ++i)  
            {  
                list.add(new Object());  
            }  
 
            System.out.println(Thread.currentThread().getName()+"【生产者:已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());  
 
            list.notifyAll();  
        }  
    }  
 
    // 消费num个产品  
    public void consume(int num)  
    {  
        // 同步代码段  
        synchronized (list)  
        {  
            // 如果仓库存储量不足  
            while (list.size() < num)  
            {  
                System.out.println(Thread.currentThread().getName()+"【消费者:要消费的产品数量】:" + num + "\t【库存量】:"  
                        + list.size() + "\t暂时不能执行消费任务!");  
                try  
                {  
                    // 由于条件不满足,消费阻塞  
                    list.wait();  
                }  
                catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
 
            // 消费条件满足情况下,消费num个产品  
            for (int i = 1; i <= num; ++i)  
            {  
                list.remove();  
            }  
 
            System.out.println(Thread.currentThread().getName()+"【消费者:已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());  
 
            list.notifyAll();  
        }  
    }  
 
    // get/set方法  
    public LinkedList<Object> getList()  
    {  
        return list;  
    }  
 
    public void setList(LinkedList<Object> list)  
    {  
        this.list = list;  
    }  
 
    public int getMAX_SIZE()  
    {  
        return MAX_SIZE;  
    }  
}  

第二种实现方式

第二种使用await和signal的方式实现,我们使用Lock来产生两个Condition对象来管理任务间的通信,一个重要的参考点就是仓库满,仓库空,这类方式最后必须有try-finally的子句,保证能够释放锁。

package com.jeanron.test2;

import java.util.LinkedList;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  

import com.jeanron.test2.Storage;
 
 
public class Storage  
{  
    // 仓库最大存储量  
    private final int MAX_SIZE = 200;  
 
    // 仓库存储的载体  
    private LinkedList<Object> list = new LinkedList<Object>();  
 
    private static Storage storage=null;  
    public static Storage getInstance() {  
        if (storage == null) {    
            synchronized (Storage.class) {    
               if (storage == null) {    
                   storage = new Storage();   
               }    
            }    
        }    
        return storage;   
    }   
 
 
    // 锁  
    private final Lock lock = new ReentrantLock();  
 
    // 仓库满的条件变量  
    private final Condition full = lock.newCondition();  
 
    // 仓库空的条件变量  
    private final Condition empty = lock.newCondition();  
 
    // 生产num个产品  
    public void produce(int num)  
    {  
        // 获得锁  
        lock.lock();  
 
        // 如果仓库剩余容量不足  
        while (list.size() + num > MAX_SIZE)  
        {  
            System.out.println(Thread.currentThread().getName()+"【要生产的产品数量】:" + num + "\t【库存量】:" + list.size()  
                    + "\t暂时不能执行生产任务!");  
            try  
            {  
                // 由于条件不满足,生产阻塞  
                full.await();  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
 
        // 生产条件满足情况下,生产num个产品  
        for (int i = 1; i <= num; ++i)  
        {  
            list.add(new Object());  
        }  
 
        System.out.println(Thread.currentThread().getName()+"【已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());  
 
        // 唤醒其他所有线程  
        full.signalAll();  
        empty.signalAll();  
 
        // 释放锁  
        lock.unlock();  
    }  
 
    // 消费num个产品  
    public void consume(int num)  
    {  
        // 获得锁  
        lock.lock();  
 
        // 如果仓库存储量不足  
        while (list.size() < num)  
        {  
            System.out.println(Thread.currentThread().getName()+"【要消费的产品数量】:" + num + "\t【库存量】:" + list.size()  
                    + "\t暂时不能执行生产任务!");  
            try  
            {  
                // 由于条件不满足,消费阻塞  
                empty.await();  
            }  
            catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            }  
        }  
 
        // 消费条件满足情况下,消费num个产品  
        for (int i = 1; i <= num; ++i)  
        {  
            list.remove();  
        }  
 
        System.out.println(Thread.currentThread().getName()+"【已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());  
 
        // 唤醒其他所有线程  
        full.signalAll();  
        empty.signalAll();  
 
        // 释放锁  
        lock.unlock();  
    }  
 
    // set/get方法  
    public int getMAX_SIZE()  
    {  
        return MAX_SIZE;  
    }  
 
    public LinkedList<Object> getList()  
    {  
        return list;  
    }  
 
    public void setList(LinkedList<Object> list)  
    {  

测试类

测试类算是用用,我们直接使用多线程调度器Executor来做。对于生产者消费者使用随机数的方式来初始化物品数,仓库使用单例模式。

package com.jeanron.main; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.jeanron.entity1.Consumer; import com.jeanron.entity1.Producer; import com.jeanron.test1.Storage; public class Test1 { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i=0;i<5;i++){ exec.execute(new Producer(Storage.getInstance(),new Random().nextInt(10)*10)); } for(int i=0;i<10;i++){ exec.execute(new Consumer(Storage.getInstance(),new Random().nextInt(10)*10)); } } }

测试结果

生产者是5个线程,消费者是10个线程,我们来看看调用之后的信息,这个可以简单分析下日志即可,不具有典型性和代表性。

第一类实现方式的输出如下:

pool-1-thread-1【生产者:已经生产产品数】:10    【现仓储量为】:10
pool-1-thread-3【生产者:已经生产产品数】:50    【现仓储量为】:60
pool-1-thread-2【生产者:已经生产产品数】:50    【现仓储量为】:110
pool-1-thread-5【生产者:已经生产产品数】:0    【现仓储量为】:110
pool-1-thread-4【生产者:已经生产产品数】:80    【现仓储量为】:190
pool-1-thread-4【消费者:已经消费产品数】:40    【现仓储量为】:150
pool-1-thread-1【消费者:已经消费产品数】:10    【现仓储量为】:140
pool-1-thread-3【消费者:已经消费产品数】:20    【现仓储量为】:120
pool-1-thread-2【消费者:已经消费产品数】:10    【现仓储量为】:110
pool-1-thread-5【消费者:已经消费产品数】:40    【现仓储量为】:70
pool-1-thread-6【消费者:已经消费产品数】:30    【现仓储量为】:40
pool-1-thread-4【消费者:要消费的产品数量】:90    【库存量】:40    暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50    【库存量】:40    暂时不能执行消费任务!
pool-1-thread-3【消费者:要消费的产品数量】:90    【库存量】:40    暂时不能执行消费任务!
pool-1-thread-7【消费者:已经消费产品数】:20    【现仓储量为】:20
pool-1-thread-3【消费者:要消费的产品数量】:90    【库存量】:20    暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50    【库存量】:20    暂时不能执行消费任务!
pool-1-thread-4【消费者:要消费的产品数量】:90    【库存量】:20    暂时不能执行消费任务!

第二类实现方式的输出如下:

pool-1-thread-1【已经生产产品数】:16    【现仓储量为】:16
pool-1-thread-3【已经生产产品数】:60    【现仓储量为】:76
pool-1-thread-2【已经生产产品数】:67    【现仓储量为】:143
pool-1-thread-5【要生产的产品数量】:90    【库存量】:143    暂时不能执行生产任务!
pool-1-thread-4【已经生产产品数】:27    【现仓储量为】:170
pool-1-thread-1【已经消费产品数】:10    【现仓储量为】:160
pool-1-thread-2【已经消费产品数】:23    【现仓储量为】:137
pool-1-thread-6【已经消费产品数】:18    【现仓储量为】:119
pool-1-thread-5【要生产的产品数量】:90    【库存量】:119    暂时不能执行生产任务!
pool-1-thread-3【已经消费产品数】:97    【现仓储量为】:22
pool-1-thread-1【要消费的产品数量】:57    【库存量】:22    暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62    【库存量】:22    暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35    【库存量】:22    暂时不能执行生产任务!
pool-1-thread-4【已经消费产品数】:17    【现仓储量为】:5
pool-1-thread-2【已经消费产品数】:1    【现仓储量为】:4
pool-1-thread-7【要消费的产品数量】:93    【库存量】:4    暂时不能执行生产任务!
pool-1-thread-2【已经消费产品数】:4    【现仓储量为】:0
pool-1-thread-5【已经生产产品数】:90    【现仓储量为】:90
pool-1-thread-1【已经消费产品数】:57    【现仓储量为】:33
pool-1-thread-6【要消费的产品数量】:62    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38    【库存量】:33    暂时不能执行生产任务!
pool-1-thread-8【已经消费产品数】:33    【现仓储量为】:0
pool-1-thread-12【要消费的产品数量】:21    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-9【要消费的产品数量】:24    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-11【要消费的产品数量】:54    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-13【要消费的产品数量】:58    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65    【库存量】:0    暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38    【库存量】:0    暂时不能执行生产任务!

原文发布于微信公众号 - 杨建荣的学习笔记(jianrong-notes)

原文发表时间:2017-05-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏极乐技术社区

小程序音频API踩坑手册

7553
来自专栏移动端开发

AVFoundation 框架初探究(一)

夜深时动笔 ----       前面一篇文章写了视频播放的几种基本的方式,算是给这个系列开了一个头,这里面最想说和探究的就是AVFoundation框架,很想...

7435
来自专栏一个会写诗的程序员的博客

《Kotlin极简教程》第1章 Kotlin简介

我们这里讲的Kotlin,就是一门以这个Котлин岛命名的现代程序设计语言。它是一门静态类型编程语言,支持JVM平台,Android平台,浏览器JS运行环境,...

902
来自专栏JadePeng的技术博客

HTML5录音控件

最近的项目又需要用到录音,年前有过调研,再次翻出来使用,这里做一个记录。 HTML5提供了录音支持,因此可以方便使用HTML5来录音,来实现录音、语音识别等功能...

1.5K5
来自专栏字根中文校对软件

Java 错别字检查接口 API

Java 错别字检查接口 API 为了方便广大程序员朋友快速把错别字检查功能集成到自己的系统中,我们开发了一个支持HTTP协议的 Java 错别字检查接口 AP...

4065
来自专栏Crossin的编程教室

【Python 第4课】输入

Hi~Crossin又来了。 可以用编程语言让计算机按你说的指令做事情之后,大家是不是有些跃跃欲试呢?别着急,先回顾一下我们之前几节课。我们到现在一共提到了三种...

3327
来自专栏AI科技评论

开发 | 星际争霸2人工智能研究环境 SC2LE 初体验

1 前言 昨天,也就是2017年8月10号,DeepMind联合暴雪发布了星际争霸2人工智能研究环境SC2LE,从而使人工智能的研究进入到一个全新的阶段。这次...

4858
来自专栏Zchannel

Openload超简单无下载限制免费网盘推荐!支持影片在线播放还能赚钱

Openload有什么特色呢?如果要我以一句话推荐此服务,我会说:它简单到不行,而且几乎没有任何限制!若你有长期关注免费资源网路社群,一定知道Pseric向来喜...

1.3K1
来自专栏SAP最佳业务实践

SAP最佳业务实践:FI–现金管理(160)-14银企对账-客户收款-承兑汇票-F-36收到承兑汇票

4.5.2 F-36收到银行承兑汇票 收到客户银行承兑汇票支付应收账款,形成财务记账如下: 借:应收票据 贷:应收账款 ? 输入凭证日期、参照、抬头文本 回...

3068
来自专栏蓝天

养成良好的编程习惯

4.从源头避免版本不一致问题(当同一个文件在不同目录下出现拷贝时,容易出现其中某个未同步更新的问题)

1193

扫码关注云+社区

领取腾讯云代金券