多线程设计模式解读4—Producer-Consumer模式

Producer-Consumer模式可以说是多线程设计模式之王,后期我们要讲的许多模式像Thread-Pool模式,Active Object模式等都是Producer-Consumer模式的变种。Producer-Consumer模式中的生产者和消费者阻塞唤醒机制可以通过Guarded Suspension模式实现。

为什么要有Producer-Consumer模式呢?

1、消除了生产者与消费者之间的代码依赖。

2、实现线程间的协调运行。生产者与消费者之间的运行速率不同,直接调用,数据处理会产生延迟,导致程序响应性下降。

这种模式我们平时应该经常接触到,小到单体应用中ThreadPoolExecutor的编码,大到架构实现中Kafka,RabbitMQ的使用。它由以下角色组成:

Producer:负责生成Product,传入Channel。

Product:由Producer生成,供Consumer使用。

Channel:Producer和Consumer之间的缓冲区,用于传递Product。

Consumer:从Channel获取Product使用。

这里我们可以使用java.util.concurrent中的BlockingQueue阻塞队列实现Channel。它可以极大地简化编程,take操作会一直阻塞直到有可用数据,put在channel满时也会阻塞直到有数据被消费。它有如下实现类:

1、LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,前者基于链表实现,如果不特别指定,元素个数没有最大限制,后者基于数组实现,元素个数有最大限制。

2、PriorityBlockingQueue是按优先级排序的队列,DelayQueue是一定时间后才可以take的队列。

3、SynchronousQueue,没有存储功能,直接传递,因此put和take会一直阻塞,直到另一个线程准备好参与。

该模式的示例代码如下:

public class PCTest {

  public static void main(String[] args) {
    //channel,有界阻塞队列,容量100
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
    Producer producer = new Producer(queue);
    Consumer consumer = new Consumer(queue);

    new Thread(producer).start();
    new Thread(consumer).start();
    System.out.println("生产者,消费者开始运行");
  }
}


class Consumer implements Runnable{

  private BlockingQueue<String> queue;

  public Consumer(BlockingQueue<String> q){
    this.queue=q;
  }

  @Override
  public void run() {
    try{
      String data;
      //获取消息,收到exit后退出
      while((data = queue.take()) !="exit"){
        Thread.sleep(20);
        System.out.println("Consume: "+data);
      }
    }catch(InterruptedException e) {
      e.printStackTrace();
    }
  }
}


class Producer implements Runnable {

  private BlockingQueue<String> queue;

  public Producer(BlockingQueue<String> q){
    this.queue=q;
  }
  @Override
  public void run() {
    //生产消息
    for(int i=0; i<100; i++){
      String data = new String(""+i);
      try {
        Thread.sleep(i);
        queue.put(data);
        System.out.println("Produce:"+data);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    //毒丸对象
    String data = "exit";
    try {
      queue.put(data);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

1、Channel剩余空间问题:

如果消费者消费比较慢,这就会导致Channel中的Product逐渐积压,对此,我们可以使用有界阻塞队列,当队列满时,会阻塞直到消费者消费才继续生产Product。如果使用无界阻塞队列,就要考虑使用一段时间后,内存不足的情况,可以采用Semaphore信号量来控制。

2、只有一个共享队列时的锁的竞争

如果多个消费者同时消费同一个队列的时候,就会导致锁的竞争,不过BlockingQueue阻塞队列已经帮我们实现了相应的机制,使用Lock,Condition等控制多线程运行,其实就是对Guarded Suspension模式的应用。我们可以通过工作密取算法降低锁的竞争,提高可伸缩性。即每个消费者都有自己的双端队列(Deque,具体实现有ArrayDeque和LinkedBlockingDeque),一个消费者处理完自己队列的Product时,可以从其他消费者双端队列的末尾秘密获取Product。它非常适用于既是生产者又是消费者的问题,比如爬虫,当处理一个页面后,发现有更多页面需要处理,把这些新任务放到自己队列的末尾,当自己的双端队列为空时,则从其他队列尾部获取新任务。

3、线程停止

消费者线程和生产者线程哪个先停止,一般是先停止生产者,等Channel剩余Product备份后,或者被消费者处理完后,再停止消费者。至于具体实现,我们可以采用Two-phase termination 模式,设置停止标志并且使用中断;如果你使用线程池管理,则可以调用shutdown方法,它会等队列中的所有任务完成再关闭(shuwdownNow则可能在任务执行到一半时强行关闭);如果生产者和消费者数量不大,可以采用如上面示例中的毒丸对象,来关闭服务。

原文发布于微信公众号 - java达人(drjava)

原文发表时间:2018-08-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java面试通关手册

面试必备之深入理解自旋锁

分享一个我自己总结的Java学习的系统知识点以及面试问题,目前已经开源,会一直完善下去,欢迎建议和指导欢迎Star: https://github.com/Sn...

65730
来自专栏Star先生的专栏

从源码中分析 Hadoop 的 RPC 机制

RPC是Remote Procedure Call(远程过程调用)的简称,这一机制都要面对两个问题:对象调用方式余与序列/反序列化机制。本文给大家介绍从源码中分...

75200
来自专栏Java学习之路

Java中的阻塞队列

一丶什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个可以进行阻塞插入和阻塞移除的附加方法的队列。 1)阻塞插入:当队列满后,队列会阻塞(...

37560
来自专栏乐沙弥的世界

mongoDB 文档插入

db.collection.insertOne(obj, ) 插入单个文档到一个集合(3.2版本有效),可选参数为w, wtimeou...

9430
来自专栏信安之路

HCTF2017的三个WriteUp

解决方法就是先 undefine 掉函数,再右键选择 Code,最后 Create function 就可以正常反编译了。

11800
来自专栏熊二哥

快速入门系列--WebAPI--04在老版本MVC4下的调整

WebAPI是建立在MVC和WCF的基础上的,原来微软老是喜欢封装的很多,这次终于愿意将http编程模型的相关细节暴露给我们了。在之前的介绍中,基本上都基于.N...

24760
来自专栏祝威廉

Kafka Zero-Copy 使用分析

Kafka 我个人感觉是性能优化的典范。而且使用Scala开发,代码写的也很漂亮的。重点我觉得有四个

22220
来自专栏木子墨的前端日常

easyUI datagrid 清空

最近在做一个管理系统,出于一些需要,经常要将一些datagrid清空。然后easyUI本身并没有自带的方法,然后自己动手丰衣足食吧。

11730
来自专栏大数据学习笔记

Saltstack 快速入门教程

1.介绍 Saltstack 比 Puppet 出来晚几年,是基于Python 开发的,也是基于 C/S 架构,服务端 master 和客户端 minions ...

1.4K80
来自专栏Spark学习技巧

Hbase源码系列之scan源码解析及调优

一,hbase的scan基本使用问题介绍 Hbase的Scan方法是基于Rowkey进行数据扫描的,过程中client会将我们的请求,转化为向服务端的RPC请求...

45480

扫码关注云+社区

领取腾讯云代金券