前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用BlockingQueue的生产者消费者模式

使用BlockingQueue的生产者消费者模式

作者头像
用户3003813
发布2018-09-06 14:04:57
1.3K0
发布2018-09-06 14:04:57
举报
文章被收录于专栏:个人分享

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。

首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。

BlockingQueue的核心方法:

放入数据:

  offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.

  offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。

  put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。

获取数据:   poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,     取不到时返回null;   poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,     队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。   take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到     BlockingQueue有新的数据被加入;    drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),      通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

测试代码:

代码语言:javascript
复制
package BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class BlockingQueueTest {
    public static void main(String args[]) throws InterruptedException{
        BlockingQueue<String> queue = new ArrayBlockingQueue(10);
        
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        
        ExecutorService service = Executors.newCachedThreadPool();
        
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);
        
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
        
        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}

生产者:

代码语言:javascript
复制
package BlockingQueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


public class Producer implements Runnable{
    
    private volatile boolean      isRunning               = true;
    private BlockingQueue<String> queue;
    private static AtomicInteger  count                   = new AtomicInteger();
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
        
    public Producer(BlockingQueue queue){
        this.queue = queue;
    }
    
    public void run(){
        String data = null;
        Random r = new Random();
        System.out.println("启动生产者线程");
        try{
            while(isRunning){
                System.out.println("正在生产数据.....");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                
                data = "data:" + count.incrementAndGet();
                System.out.println("将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("放入数据失败:" + data);
                }
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        finally{
            System.out.println("退出生产者线程!");
        }
    }
    
    public void stop(){
        isRunning = false;    
    }
    
    
}

消费者:

代码语言:javascript
复制
package BlockingQueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable{
     private BlockingQueue<String> queue;
     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    
    public Consumer(BlockingQueue<String> queue){
        this.queue = queue;
    }
    
    public void run(){
        System.out.println("启动消费者线程:");
        Random r = new Random();
        boolean isRunning = true;
        try{
            while(isRunning){
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2,TimeUnit.SECONDS);
                if(null != data){
                     System.out.println("拿到数据:" + data);
                     System.out.println("正在消费数据:" + data);
                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                }else{
                    isRunning = false;
                }
            }
        }catch(InterruptedException e){
             e.printStackTrace();
             Thread.currentThread().interrupt();
        }finally{
            System.out.println("退出消费者线程!");
        }
    }
}

参考:http://wsmajunfeng.iteye.com/blog/1629354

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016-04-24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档