SynchronousQueue使用实例

本文主要讲一下SynchronousQueue。

定义

SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。

如果以洗盘子的比喻为例,那么这就相当于没有盘架,而是将洗好的盘子直接放入下一个空闲的烘干机中。这种实现队列的方式看似很奇怪,但由于可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。(在传统的队列中,在一个工作单元可以交付之前,必须通过串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操作。)

直接交付方式还会将更多关于任务状态的信息反馈给生产者。当交付被接受时,它就知道消费者已经得到了任务,而不是简单地把任务放入一个队列——这种区别就好比将文件直接交给同事,还是将文件放到她的邮箱中并希望她能尽快拿到文件。

因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

实例

public class SynchronousQueueExample {

    static class SynchronousQueueProducer implements Runnable {

        protected BlockingQueue<String> blockingQueue;
        final Random random = new Random();

        public SynchronousQueueProducer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println("Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class SynchronousQueueConsumer implements Runnable {

        protected BlockingQueue<String> blockingQueue;

        public SynchronousQueueConsumer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()
                            + " take(): " + data);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();

        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                synchronousQueue);
        new Thread(queueProducer).start();

        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer1).start();

        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer2).start();

    }
}

插入数据的线程和获取数据的线程,交替执行

应用场景

Executors.newCachedThreadPool()

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。

所以,使用SynchronousQueue作为工作队列,工作队列本身并不限制待执行的任务的数量。但此时需要限定线程池的最大大小为一个合理的有限值,而不是Integer.MAX_VALUE,否则可能导致线程池中的工作者线程的数量一直增加到系统资源所无法承受为止。

如果应用程序确实需要比较大的工作队列容量,而又想避免无界工作队列可能导致的问题,不妨考虑SynchronousQueue。SynchronousQueue实现上并不使用缓存空间。 使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。

doc

  • A Guide to Java SynchronousQueue
  • SynchronousQueue Example in Java - Produer Consumer Solution
  • Java SynchronousQueue

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2017-09-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏AI科技评论

开发 | Python高级技巧:用一行代码减少一半内存占用

我想与大家分享一些我和我的团队在一个项目中经历的一些问题。在这个项目中,我们必须要存储和处理一个相当大的动态列表。测试人员在测试过程中,抱怨内存不足。下面介绍一...

1264
来自专栏听Allen瞎扯淡

为什么java.util.concurrent 包里没有并发的ArrayList实现?

问:JDK 5在 java.util.concurrent 里引入了 ConcurrentHashMap,在需要支持高并发的场景,我们可以使用它代替 HashM...

1002
来自专栏微信终端开发团队的专栏

新年新语言,WCDB Swift

WCDB 作为微信的开发者们的需求,并不断优化性能。而这其中,呼声最高的莫过于对 Swift 的支持。

7798
来自专栏信安之路

【作者投稿】一道反序列化CTF引起的思考

刚开始看到这道题目,我是懵逼的。因为整篇代码没有数据输入口,然后怀疑有其它机关,抓包、扫目录无果之后,找到了一篇writeup如下:

1320
来自专栏编程

新年新语言,WCDB Swift

WCDB ObjC 版本的实现中,由于引入了 C++ 代码,并不能直接 bridge 到 Swift。因此,我们从 9 月份开始就着手使用原生的 Swift,重...

2159
来自专栏Ldpe2G的个人博客

ScalaMP ---- 模仿 OpenMp 的一个简单并行计算框架

这个项目是一次课程作业,要求是写一个并行计算框架,本人本身对openmp比较熟,

2146
来自专栏美团技术团队

【技术博客】Android自定义Lint实践

概述 Android Lint是Google提供给Android开发者的静态代码检查工具。使用Lint对Android工程代码进行扫描和检查,可以发现代码潜在的...

4426
来自专栏超然的博客

JSON 和 JSONP 两兄弟

  但说到AJAX就会不可避免的面临两个问题,第一个是AJAX以何种格式来交换数据?第二个是跨域的需求如何解决?

1423
来自专栏技术专栏

慕课网高并发实战(七)- J.U.C之AQS

3.不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒

1682
来自专栏娱乐心理测试

Invalid prop: type check failed for prop "price". Expected String, got Number.

在谷歌浏览器上写Vue项目时,总会有很多警告,关键是红色的,非常刺眼,一片红好像是严重的错误,在有强迫症的程序员眼里非常之别扭,准备清除警告!

1822

扫码关注云+社区

领取腾讯云代金券