前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程设计模式解读4—Producer-Consumer模式

多线程设计模式解读4—Producer-Consumer模式

作者头像
java达人
发布2018-10-08 11:21:31
1K0
发布2018-10-08 11:21:31
举报
文章被收录于专栏:java达人

Producer-Consumer模式可以说是多线程设计模式之王,后期我们要讲的许多模式像Thread-Pool模式,Active Object模式等都是Producer-Consumer模式的变种。Producer-Consumer模式中的生产者和消费者阻塞唤醒机制可以通过Guarded Suspension模式实现。

为什么要有Producer-Consumer模式呢?

1、消除了生产者与消费者之间的代码依赖。

2、实现线程间的协调运行。生产者与消费者之间的运行速率不同,直接调用,数据处理会产生延迟,导致程序响应性下降。

这种模式我们平时应该经常接触到,小到单体应用中ThreadPoolExecutor的编码,大到架构实现中Kafka,RabbitMQ的使用。它由以下角色组成:

Producer:负责生成Product,传入Channel。

Product:由Producer生成,供Consumer使用。

Channel:Producer和Consumer之间的缓冲区,用于传递Product。

Consumer:从Channel获取Product使用。

这里我们可以使用java.util.concurrent中的BlockingQueue阻塞队列实现Channel。它可以极大地简化编程,take操作会一直阻塞直到有可用数据,put在channel满时也会阻塞直到有数据被消费。它有如下实现类:

1、LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,前者基于链表实现,如果不特别指定,元素个数没有最大限制,后者基于数组实现,元素个数有最大限制。

2、PriorityBlockingQueue是按优先级排序的队列,DelayQueue是一定时间后才可以take的队列。

3、SynchronousQueue,没有存储功能,直接传递,因此put和take会一直阻塞,直到另一个线程准备好参与。

该模式的示例代码如下:

代码语言:javascript
复制
public class PCTest {

  public static void main(String[] args) {
    //channel,有界阻塞队列,容量100
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
    Producer producer = new Producer(queue);
    Consumer consumer = new Consumer(queue);

    new Thread(producer).start();
    new Thread(consumer).start();
    System.out.println("生产者,消费者开始运行");
  }
}


class Consumer implements Runnable{

  private BlockingQueue<String> queue;

  public Consumer(BlockingQueue<String> q){
    this.queue=q;
  }

  @Override
  public void run() {
    try{
      String data;
      //获取消息,收到exit后退出
      while((data = queue.take()) !="exit"){
        Thread.sleep(20);
        System.out.println("Consume: "+data);
      }
    }catch(InterruptedException e) {
      e.printStackTrace();
    }
  }
}


class Producer implements Runnable {

  private BlockingQueue<String> queue;

  public Producer(BlockingQueue<String> q){
    this.queue=q;
  }
  @Override
  public void run() {
    //生产消息
    for(int i=0; i<100; i++){
      String data = new String(""+i);
      try {
        Thread.sleep(i);
        queue.put(data);
        System.out.println("Produce:"+data);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    //毒丸对象
    String data = "exit";
    try {
      queue.put(data);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

1、Channel剩余空间问题:

如果消费者消费比较慢,这就会导致Channel中的Product逐渐积压,对此,我们可以使用有界阻塞队列,当队列满时,会阻塞直到消费者消费才继续生产Product。如果使用无界阻塞队列,就要考虑使用一段时间后,内存不足的情况,可以采用Semaphore信号量来控制。

2、只有一个共享队列时的锁的竞争

如果多个消费者同时消费同一个队列的时候,就会导致锁的竞争,不过BlockingQueue阻塞队列已经帮我们实现了相应的机制,使用Lock,Condition等控制多线程运行,其实就是对Guarded Suspension模式的应用。我们可以通过工作密取算法降低锁的竞争,提高可伸缩性。即每个消费者都有自己的双端队列(Deque,具体实现有ArrayDeque和LinkedBlockingDeque),一个消费者处理完自己队列的Product时,可以从其他消费者双端队列的末尾秘密获取Product。它非常适用于既是生产者又是消费者的问题,比如爬虫,当处理一个页面后,发现有更多页面需要处理,把这些新任务放到自己队列的末尾,当自己的双端队列为空时,则从其他队列尾部获取新任务。

3、线程停止

消费者线程和生产者线程哪个先停止,一般是先停止生产者,等Channel剩余Product备份后,或者被消费者处理完后,再停止消费者。至于具体实现,我们可以采用Two-phase termination 模式,设置停止标志并且使用中断;如果你使用线程池管理,则可以调用shutdown方法,它会等队列中的所有任务完成再关闭(shuwdownNow则可能在任务执行到一半时强行关闭);如果生产者和消费者数量不大,可以采用如上面示例中的毒丸对象,来关闭服务。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-08-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档