前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程同步之生产者消费者

线程同步之生产者消费者

作者头像
云海谷天
发布2022-08-09 14:03:47
2040
发布2022-08-09 14:03:47
举报
文章被收录于专栏:技术一点点成长

前言:

  前面因时间关系,未将“生产者消费者问题”实例的介绍发布在博客随笔中,故本文作为对之前“多线程”一文的补充。 概念:   生产者消费者问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。这个案例中主要实现的是两个角色协同对同一资源进行访问。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据

设计:本博客前面关于多线程的文章中已讲述过一个实例。本文的思路跟之前差不多,不过这次引入了一个存储资源的队列,并且需要设定2个用于信号传递的条件(Condition)。其实现过程见如下简图:

代码实现

代码语言:javascript
复制
package com.gdufe.thread.consumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConsumerProducer {
  private static Buffer buffer = new Buffer();

  public static void main(String[] args) {
    // Create a thread pool with two threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.execute(new ProducerTask());
    executor.execute(new ConsumerTask());
    executor.shutdown();
  }

  // A task for adding an int to the buffer
  private static class ProducerTask implements Runnable {
    public void run() {
      try {
        int i = 1;
        while (true) {
          System.out.println("Producer writes " + i);
          buffer.write(i++); // Add a value to the buffer
          // Put the thread into sleep
          Thread.sleep((int)(Math.random() * 10000));
        }
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
    }
  }

  // A task for reading and deleting an int from the buffer
  private static class ConsumerTask implements Runnable {
    public void run() {
      try {
        while (true) {
          System.out.println("\t\t\tConsumer reads " + buffer.read());
          // Put the thread into sleep
          Thread.sleep((int)(Math.random() * 10000));
        }
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
    }
  }

  // An inner class for buffer
  private static class Buffer {
    private static final int CAPACITY = 1; // buffer size
    private java.util.LinkedList<Integer> queue =
      new java.util.LinkedList<Integer>();	//queue for storing data

    // Create a new lock
    private static Lock lock = new ReentrantLock();

    // Create two conditions
    private static Condition notEmpty = lock.newCondition();
    private static Condition notFull = lock.newCondition();

    public void write(int value) {
      lock.lock(); // Acquire the lock
      try {
        while (queue.size() == CAPACITY) {
          System.out.println("Wait for notFull condition");
          notFull.await();
        }

        queue.offer(value);
        notEmpty.signal(); // Signal notEmpty condition
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      } finally {
        lock.unlock(); // Release the lock
      }
    }

    public int read() {
      int value = 0;
      lock.lock(); // Acquire the lock
      try {
        while (queue.isEmpty()) {
          System.out.println("\t\t\tWait for notEmpty condition");
          notEmpty.await();
        }

        value = queue.remove();
        notFull.signal(); // Signal notFull condition
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      } finally {
        lock.unlock(); // Release the lock
        return value;
      }
    }
  }
}

测试结果

补充

在Java集合框架中,其实有“阻塞队列”这一概念。其特点具有同步方法,也就是说套用阻塞队列,我们可以通过简化上面的代码同样实现生产者消费者的线程同步问题。

参考代码如下:

代码语言:javascript
复制
 1 package com.gdufe.thread.consumer;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class ConsumerProducerUsingBlockingQueue {
 8   private static ArrayBlockingQueue<Integer> buffer =
 9     new ArrayBlockingQueue<Integer>(2);
10 
11   public static void main(String[] args) {
12     // Create a thread pool with two threads
13     ExecutorService executor = Executors.newFixedThreadPool(2);
14     executor.execute(new ProducerTask());
15     executor.execute(new ConsumerTask());
16     executor.shutdown();
17   }
18 
19   // A task for adding an int to the buffer
20   private static class ProducerTask implements Runnable {
21     public void run() {
22       try {
23         int i = 1;
24         while (true) {
25           System.out.println("Producer writes " + i);
26           buffer.put(i++); // Add any value to the buffer, say, 1
27           // Put the thread into sleep
28           Thread.sleep((int)(Math.random() * 10000));
29         }
30       } catch (InterruptedException ex) {
31         ex.printStackTrace();
32       }
33     }
34   }
35 
36   // A task for reading and deleting an int from the buffer
37   private static class ConsumerTask implements Runnable {
38     public void run() {
39       try {
40         while (true) {
41           System.out.println("\t\t\tConsumer reads " + buffer.take());
42           // Put the thread into sleep
43           Thread.sleep((int)(Math.random() * 10000));
44         }
45       } catch (InterruptedException ex) {
46         ex.printStackTrace();
47       }
48     }
49   }
50 }
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2015-08-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档