前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >生产者和消费者模型

生产者和消费者模型

作者头像
希希里之海
发布2019-08-30 11:54:43
6480
发布2019-08-30 11:54:43
举报
文章被收录于专栏:weixuqin 的专栏

生产者和消费者模型

1. 什么是生产者和消费者模型

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。

再具体一点:

  1. 生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
  2. 如果缓冲区已经满了,则生产者线程阻塞。
  3. 如果缓冲区为空,那么消费者线程阻塞。

2. 如何实现

实现生产者消费者模型有两种方式:

  1. 采用 wait—notify 方式实现生产者消费者模型(注意这里需要加同步锁 synchronized)
  2. 采用 阻塞队列 方式实现生产者消费者模式

3. wait-notify 方式

实现过程并不复杂,直接上代码:

这里设置了生产者生产速度大于消费者消费速度(通过 sleep() 方法实现)。

缓冲区 BufferArea.java

代码语言:javascript
复制
public class BufferArea {

    // 当前资源数量的计数值
    private int currNum = 0;

    // 资源池中允许存放的资源数目
    private int maxSize = 10;

    /**
     * 从资源池中取走资源
     */
    public synchronized void get() {
        if (currNum > 0) {
            currNum--;
            System.out.println("Cosumer_" + Thread.currentThread().getName() + "消耗一件资源," + "当前缓冲区有" + currNum + "个");
            // 通知生产者生产资源
            notifyAll();
        } else {
            try {
                // 如果没有资源,则 Cosumer_ 进入等待状态
                System.out.println("Cosumer_" + Thread.currentThread().getName() + ": 当前缓冲区资源不足,进入等待状态");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 向缓冲区中添加资源
     */
    public synchronized void put() {
        // 若当前缓冲区内的资源计数小于最大 size 数,才加
        if (currNum < maxSize) {
            currNum++;
            System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + currNum + "个");

            // 通知等待的消费者
            notifyAll();
        } else {
            // 若当前缓冲区的资源计数大于最大 size 数,则等待
            try {
                System.out.println(Thread.currentThread().getName() + "线程进入等待 << 当前缓冲区的资源计数大于最大 size 数");
                // 生产者进入等待状态,并释放锁
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

生产者 Producer.java

代码语言:javascript
复制
public class Producer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Producer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Producer_" + getName());
    }

    @Override
    public void run() {
        // 不断的生产资源
        while (true) {
            sleepSomeTime();
            mBufferArea.put();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消费者 Consumer

代码语言:javascript
复制
public class Consumer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Consumer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Consumer_" + getName());
    }

    @Override
    public void run() {
        // 不断的取出资源
        while (true) {
            sleepSomeTime();
            mBufferArea.get();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

测试 Test.java

代码语言:javascript
复制
public class Test {

    public static void main(String[] args) {
        BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();

        Consumer consumer1 = new Consumer(bufferArea);
        Consumer consumer2 = new Consumer(bufferArea);
        Consumer consumer3 = new Consumer(bufferArea);

        Producer producer1 = new Producer(bufferArea);
        Producer producer2 = new Producer(bufferArea);
        Producer producer3 = new Producer(bufferArea);

        consumer1.start();
        consumer2.start();
        consumer3.start();

        producer1.start();
        producer2.start();
        producer3.start();

    }

}

打印结果如下:

代码语言:javascript
复制
ProducerThread-5生产一件资源,当前资源池有1个
ProducerThread-4生产一件资源,当前资源池有2个
ProducerThread-3生产一件资源,当前资源池有3个
ProducerThread-5生产一件资源,当前资源池有4个
ProducerThread-4生产一件资源,当前资源池有5个
ProducerThread-3生产一件资源,当前资源池有6个
ProducerThread-5生产一件资源,当前资源池有7个
ProducerThread-4生产一件资源,当前资源池有8个
ProducerThread-3生产一件资源,当前资源池有9个
ProducerThread-3生产一件资源,当前资源池有10个
ProducerThread-4线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
ProducerThread-5线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
ProducerThread-3线程进入等待 << 当前缓冲区的资源计数大于最大 size 数

>> 注释:3个生产者线程生产满了10个(maxSize)产品,然后就都进入了等待

Cosumer_Consumer_Thread-0消耗一件资源,当前缓冲区有9个
Cosumer_Consumer_Thread-1消耗一件资源,当前缓冲区有8个
Cosumer_Consumer_Thread-2消耗一件资源,当前缓冲区有7个

>> 注释:3个消费者消费了3个产品

ProducerThread-3生产一件资源,当前资源池有8个
ProducerThread-5生产一件资源,当前资源池有9个
ProducerThread-4生产一件资源,当前资源池有10个

>> 注释:生产者立马又生产3个

...

>> 然后一直循环往复这个过程

4. 阻塞队列方式

阻塞队列的特点:
  • 当队列元素已满的时候,阻塞插入操作
  • 当队列元素为空的时候,阻塞获取操作
不同的阻塞队列:

ArrayBlockingQueue 与 LinkedBlockingQueue 都是支持 FIFO (先进先出),但是 LinkedBlockingQueue 是无界的,而ArrayBlockingQueue 是有界的。

这里我们采用无界阻塞队列来演示生产者消费者模式。

演示

还是设置生产者生产速度大于消费者消费速度(通过 sleep() 方法实现)

缓冲区 BlockQueueBufferArea.java

代码语言:javascript
复制
public class BlockQueueBufferArea {

    BlockingQueue<Integer> mProductPoll = new LinkedBlockingQueue(10);

    public void  put() {
        try {
            System.out.println(Thread.currentThread().getName() + "产品池被放入了一个资源");
            mProductPoll.put(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void get() {
        try {
            System.out.println(Thread.currentThread().getName() + "产品池被取走了一个资源");
            mProductPoll.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

生产者 Producer.java

代码语言:javascript
复制
public class Producer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Producer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Producer_" + getName());
    }

    @Override
    public void run() {
        // 不断的生产资源
        while (true) {
            sleepSomeTime();
            mBufferArea.put();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消费者 Consumer.java

代码语言:javascript
复制
public class Consumer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Consumer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Consumer_" + getName());
    }

    @Override
    public void run() {
        // 不断的取出资源
        while (true) {
            sleepSomeTime();
            mBufferArea.get();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

测试 Test.java

代码语言:javascript
复制
public class Test {

    public static void main(String[] args) {
        BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();

        Consumer consumer1 = new Consumer(bufferArea);
        Consumer consumer2 = new Consumer(bufferArea);
        Consumer consumer3 = new Consumer(bufferArea);

        Producer producer1 = new Producer(bufferArea);
        Producer producer2 = new Producer(bufferArea);
        Producer producer3 = new Producer(bufferArea);

        consumer1.start();
        consumer2.start();
        consumer3.start();

        producer1.start();
        producer2.start();
        producer3.start();

    }

}

打印结果如下:

代码语言:javascript
复制
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Consumer_Thread-0产品池被取走了一个资源
Consumer_Thread-1产品池被取走了一个资源
Consumer_Thread-2产品池被取走了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-08-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 生产者和消费者模型
    • 1. 什么是生产者和消费者模型
      • 2. 如何实现
        • 3. wait-notify 方式
          • 4. 阻塞队列方式
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档