首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >生产者消费者模式的三种实现方式

生产者消费者模式的三种实现方式

作者头像
earthchen
发布2020-09-24 11:45:04
2980
发布2020-09-24 11:45:04
举报

java中生产者消费者模式的三种实现方式

生产者消费者的实现

生产者生产数据到缓冲区中,消费者从缓冲区中取数据。

如果缓冲区已经满了,则生产者线程阻塞;

如果缓冲区为空,那么消费者线程阻塞。

wait notify方式实现

1. 生产者

package com.earthchen.ProducerConsumer.waitnotify;

import java.util.Queue;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Producer extends Thread {

    private final Queue<Integer> queue;

    private String name;

    private Integer maxSize;

    public Producer(Queue<Integer> queue, String name, Integer maxSize) {
        super(name);
        this.name = name;
        this.queue = queue;
        this.maxSize = maxSize;
    }


    @Override
    public void run() {
        while (true) {
            synchronized (this.queue) {
                while (queue.size() >= maxSize) {
                    System.out.println("队列已满");
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("此时队列大小为" + queue.size() + "生产者" + name + "向队列中添加一个元素");
                queue.offer(1);
                queue.notifyAll();

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

2. 消费者

package com.earthchen.ProducerConsumer.waitnotify;

import java.util.Queue;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Consumer extends Thread {

    private final Queue<Integer> queue;

    private String name;


    public Consumer(Queue<Integer> queue, String name) {
        super(name);
        this.name = name;
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    System.out.println("队列为空");
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                queue.poll();
                System.out.println("此时队列大小为" + queue.size() + name + "消费者消费了一个元素");
                queue.notifyAll();

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

3. 测试类

package com.earthchen.producerconsumer.waitnotify;

import com.earthchen.ProducerConsumer.waitnotify.Consumer;
import com.earthchen.ProducerConsumer.waitnotify.Producer;

import java.util.LinkedList;
import java.util.Queue;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Main {


    public static void main(String[] args) {

        Queue<Integer> queue = new LinkedList<Integer>();
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Producer(queue, "生产者" + i, 5));
            thread.start();
        }

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Consumer(queue, "消费者" + i));
            thread.start();
        }


    }
}

使用阻塞队列实现

1. 生产者

package com.earthchen.ProducerConsumer.blockingqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Producer extends Thread{

    private BlockingQueue<Integer> queue;

    private String name;

    public Producer(BlockingQueue<Integer> queue, String name) {
        super(name);
        this.queue = queue;
        this.name = name;
    }

    @Override
    public void run() {
        while (true){
            try {
                queue.put(1);
                System.out.println(name+"生产了一个元素");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2. 消费者

package com.earthchen.ProducerConsumer.blockingqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Consumer extends Thread {


    private BlockingQueue<Integer> queue;

    private String name;

    public Consumer(BlockingQueue<Integer> queue, String name) {
        super(name);
        this.queue = queue;
        this.name = name;
    }

    @Override
    public void run() {
        while (true) {
            try {
                queue.take();

                System.out.println("此时队列大小为" + queue.size() + name + "消费者消费了一个元素");

                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

3. 测试类

package com.earthchen.ProducerConsumer.blockingqueue;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Main {

    public static void main(String[] args) {

        BlockingQueue<Integer> queue= new LinkedBlockingDeque<>(5);
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Producer(queue, "生产者"+i));
            thread.start();
        }

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Consumer(queue, "消费者" + i));
            thread.start();
        }

    }
}

使用condition实现

  1. 生产者
package com.earthchen.ProducerConsumer.lock;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Producer extends Thread{

    private Queue<Integer> queue;

    private String name;

    private Integer maxSize;

    private Lock lock;

    private Condition fullCondition;

    private Condition emptyCondition;

    public Producer(Queue<Integer> queue,String name,Integer maxSize,Lock lock,Condition fullCondition, Condition emptyCondition){
        super(name);
        this.queue=queue;
        this.name=name;
        this.maxSize=maxSize;
        this.lock=lock;
        this.fullCondition=fullCondition;
        this.emptyCondition=emptyCondition;
    }

    @Override
    public void run() {
        while (true){
            lock.lock();
            while (queue.size()>=maxSize){
                System.out.println("队列已满");
                try {
                    fullCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("此时队列大小为" + queue.size() + "生产者" + name + "向队列中添加一个元素");
            queue.offer(1);
            fullCondition.signalAll();
            emptyCondition.signalAll();

            lock.unlock();

            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2. 消费者

package com.earthchen.ProducerConsumer.lock;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Consumer extends Thread {

    private Queue<Integer> queue;

    private String name;

    private Lock lock;

    private Condition fullCondition;

    private Condition emptyCondition;


    public Consumer(Queue<Integer> queue, String name, Lock lock, Condition fullCondition, Condition emptyCondition) {
        super(name);
        this.queue = queue;
        this.name = name;
        this.lock = lock;
        this.fullCondition = fullCondition;
        this.emptyCondition = emptyCondition;
    }


    @Override
    public void run() {
        while (true) {
            lock.lock();
            while (queue.isEmpty()) {
                System.out.println("队列为空");
                try {
                    emptyCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.poll();

            System.out.println("此时队列大小为" + queue.size() + name + "消费者消费了一个元素");

            fullCondition.signalAll();
            emptyCondition.signalAll();

            lock.unlock();

            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

3. 测试类

package com.earthchen.ProducerConsumer.lock;


import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author earthchen
 * @date 2018/9/20
 **/
public class Main {

    public static void main(String[] args) {

        Queue<Integer> queue = new LinkedList<Integer>();
        Lock lock=new ReentrantLock();
        Condition fullCondition=lock.newCondition();
        Condition emptyCondition=lock.newCondition();
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Producer(queue, "生产者" + i, 5,lock,fullCondition,emptyCondition));
            thread.start();
        }

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Consumer(queue, "消费者" + i, lock,fullCondition,emptyCondition));
            thread.start();
        }

    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-11-01,,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 生产者消费者的实现
    • wait notify方式实现
      • 使用阻塞队列实现
        • 使用condition实现
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档