专栏首页行者常至015.多线程-并发队列

015.多线程-并发队列

版权声明:本文为博主原创文章,允许转载,请标明出处。

在并发队列上JDK提供了两套实现, 一个是以ConcurrentLinkedQueue为代表的高性能队列, 一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。


ConcurrentLinkedQueue

ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列, 通过无锁的方式,实现了高并发状态下的高性能, 通常ConcurrentLinkedQueue性能好于BlockingQueue。 它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。 头是最先加入的,尾是最近加入的,该队列不允许null元素。

  • add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别) public boolean add(E e) { return offer(e); }
  • poll() 和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。 package cn.qbz.thread; import java.util.concurrent.ConcurrentLinkedQueue; public class Test111904 { public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); for (int i = 0; i < 3; i++) { new ThreadTest111904(queue).start(); } while (true) { if (queue.size() > 0) { System.out.println(queue.poll()); } } } } class ThreadTest111904 extends Thread { private ConcurrentLinkedQueue queue; public ThreadTest111904(ConcurrentLinkedQueue queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 3; i++) { queue.offer(getName() + "..." + i); } } }

BlockingQueue

在队列为空时,获取元素的线程会等待队列变为非空。 当队列满时,存储元素的线程会等待队列可用。 阻塞队列常用于生产者和消费者的场景

ArrayBlockingQueue

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。 有边界的意思是它的容量是有限的, 我们必须在其初始化的时候指定它的容量大小, 容量大小一旦指定就不可改变。

package cn.qbz.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test111905 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue(2);
        queue.add("test1");
        queue.offer("test2");

        Boolean isOffer = queue.offer("test3");
        System.out.println("添加test3:"+isOffer);

        isOffer = queue.offer("test4", 1, TimeUnit.SECONDS);
        System.out.println("添加test4:"+isOffer);

        for (int i = 0; i < 4; i++) {
            System.out.println(queue.poll());
        }
    }
}

其中,add和offer的区别是: 当超出队列界限时,add会抛出异常,offer只是返回false。


LinkedBlockingQueue

LinkedBlockingQueue阻塞队列大小的配置是可选的, 如果我们初始化时指定一个大小,它就是有边界的, 如果不指定,它就是无边界的。说是无边界, 其实是采用了默认大小为Integer.MAX_VALUE的容量 。 它的内部实现是一个链表。

code of demo:

package cn.qbz.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Test111906 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new LinkedBlockingDeque(3);
        Produce111906 p1 = new Produce111906(queue);
        Consumer111906 c1 = new Consumer111906(queue);
        
        Thread produce1 = new Thread(p1);
        Thread produce2 = new Thread(p1);
        Thread consumer1 = new Thread(c1);

        produce1.start();
        produce2.start();
        consumer1.start();

        Thread.sleep(1000 * 10);

        p1.stop();
        c1.stop();
    }

}

class Produce111906 implements Runnable {
    private BlockingQueue<String> queue;
    private volatile Boolean flag = true;
    private AtomicInteger count = new AtomicInteger();

    public Produce111906(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        while (flag) {
            try {
                int data = count.incrementAndGet();
                System.out.println("开始生产数据:" + data);
                Boolean isOffer = queue.offer(data + "", 2, TimeUnit.SECONDS);
                if (isOffer) {
                    System.out.println("写入数据:" + data + "成功");
                } else {
                    System.out.println("写入数据:" + data + "失败");
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("退出本次生产");
            }
        }

    }

    public void stop() {
        flag = false;
    }
}

class Consumer111906 implements Runnable {
    private BlockingQueue<String> queue;
    private volatile Boolean flag = true;

    public Consumer111906(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        while (flag) {
            System.out.println("开始消费数据");
            try {
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (data != null) {
                    System.out.println("消费成功:" + data);
                } else {
                    System.out.println("消费失败");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public void stop() {
        flag = false;
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • xml文件中,id、name的属性的区别

    qubianzhong
  • 008.多线程-synchronized锁

    为了解决线程安全问题, 我们的做法是:不要让多个线程同时对一个全局变量作写的操作。

    qubianzhong
  • 03.理解RabbitMQ消息通信中的基本概念

    当提到消息通信时,我们脑海里最先浮现的可能是邮箱和即时通信(IM),不过这些模型并非我们讨论的RabbitMQ消息通信。比如说,虽然AMQP(高级消息队列协议)...

    qubianzhong
  • 如何自己实现一个队列

    队列是一种先进先出的数据结构,也是常见的数据结构之一。日常生活中的排队买东西就是一种典型的队列,而在购票系统也需要一个队列处理用户的购票请求,当然这里的队列就复...

    编程珠玑
  • 【Rust日报】 2019-08-08:q 框架 - 通用队列自动机运行时实现

    这个库的作用是这个,随着数据的增多,想要从各种数据中识别出用户的关键(敏感)信息,就越来越困难,必须使用一定的工具来进行自动化处理。而这个算法就在这个过程中起作...

    MikeLoveRust
  • Redis应用之任务队列

      生产者和消费者无需知道彼此的实现细节,只需要约定好任务的描述格式,这使得生产者和消费者可以由不同的团队使用不同的编程语言编写。

    用户4919348
  • python—多进程的消息队列

    消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

    py3study
  • 你真的理解生产者/消费者模式吗?

    目光从厕所转到饭馆,一个饭馆里通常都有好多厨师以及好多服务员,这里我们把厨师称为生产者,把服务员称为消费者,厨师和服务员是不直接打交道的,而是在厨师做好菜之后放...

    南风
  • 栈和队列就是这么简单

    一、前言 上一篇已经讲过了链表【Java实现单向链表】了,它跟数组都是线性结构的基础,本文主要讲解线性结构的应用:栈和队列 如果写错的地方希望大家能够多多体谅并...

    Java3y
  • BlockingQueue与Condition原理解析

     我在前段时间写了一篇关于AQS源码解析的文章AbstractQueuedSynchronizer超详细原理解析AbstractQueuedSynchroniz...

    remcarpediem

扫码关注云+社区

领取腾讯云代金券