专栏首页犀牛饲养员的技术笔记你真的了解LinkedBlockingQueue的put,add和offer的区别吗

你真的了解LinkedBlockingQueue的put,add和offer的区别吗

概述

LinkedBlockingQueue的put,add和offer这三个方法功能很相似,都是往队列尾部添加一个元素。既然都是同样的功能,为啥要有有三个方法呢?

这三个方法的区别在于:

  • put方法添加元素,如果队列已满,会阻塞直到有空间可以放
  • add方法在添加元素的时候,若超出了度列的长度会直接抛出异常
  • offer方法添加元素,如果队列已满,直接返回false

索引这三种不同的方法在队列满时,插入失败会有不同的表现形式,我们可以在不同的应用场景中选择合适的方法。

用法示例

先看看add方法,

public class LinkedBlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2);
        fruitQueue.add("apple");
        fruitQueue.add("orange");
        fruitQueue.add("berry");
    }

当我们执行这个方法的时候,会报下面的异常,

Exception in thread "main" java.lang.IllegalStateException: Queue full
    at java.util.AbstractQueue.add(AbstractQueue.java:98)
    at com.pony.app.LinkedBlockingQueueTest.testAdd(LinkedBlockingQueueTest.java:23)
    at com.pony.app.LinkedBlockingQueueTest.main(LinkedBlockingQueueTest.java:16)

然后再来看看put用法,

public class LinkedBlockingQueueTest implements Runnable {

    static LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2);


    public static void main(String[] args) throws InterruptedException {
        new Thread(new LinkedBlockingQueueTest()).start();

        fruitQueue.put("apple");
        fruitQueue.put("orange");
        fruitQueue.put("berry");

        System.out.println(fruitQueue.toString());

    }

    @Override
    public void run() {

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        fruitQueue.poll();
    }
}

运行这段代码,你会发现首先程序会卡住(队列阻塞)3秒左右,然后打印队列的orangeberry两个元素。

因为我在程序的启动的时候顺便启动了一个线程,这个线程会在3秒后从队列头部移除一个元素。

最后看看offer的用法,

public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2);

        System.out.println(fruitQueue.offer("apple"));
        System.out.println(fruitQueue.offer("orange"));
        System.out.println(fruitQueue.offer("berry"));

    }

运行结果:

true
true
false

源码分析

先来看看add方法的实现,

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

所以add其实是包装了一下offer,没什么可以说的。

然后来看看putoffer的实现,两个放在一起说。

put方法源码,

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

offer方法源码,

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

我们重点关注他们的区别,offer方法在插入的时候会等一个超时时间timeout,如果时间到了队列还是满的(count.get() == capacity),就会返回false。

而put方法是无限期等待,

while (count.get() == capacity) {
                notFull.await();
            }

所以我们在应用层使用的时候,如果队列满再插入会阻塞。

实际场景应用

在早期版本的kafka中,生产者端发送消息使用了阻塞队列,代码如下:

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
    for (message <- messages) {
      val added = config.queueEnqueueTimeoutMs match {
        case 0  =>
          queue.offer(message)
        case _  =>
          try {
            if (config.queueEnqueueTimeoutMs < 0) {
              queue.put(message)
              true
            } else {
              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
            }
          }
          catch {
            case _: InterruptedException =>
              false
          }
      }
      if(!added) {
        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
      }else {
        trace("Added to send queue an event: " + message.toString)
        trace("Remaining queue size: " + queue.remainingCapacity)
      }
    }
  }

可以看到,config.queueEnqueueTimeoutMs是0的时候,使用的是offer方法,小于0的时候则使用put方法。

我们在使用kafka的时候,可以通过queue.enqueue.timeout.ms来决定使用哪种方式。比如某些应用场景下,比如监控,物联网等场景,允许丢失一些消息,可以把queue.enqueue.timeout.ms配置成0,这样就kafka底层就不会出现阻塞了。

新版的kafka(我印象中是2.0.0版本开始?)用java重写了,不再使用阻塞队列,所以没有上面说的问题。

本文分享自微信公众号 - 犀牛饲养员的技术笔记(coder_start_up),作者:siwuxie18

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-02-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ES系列之原来查看文档数量有这么多姿势

    事实上远不止这么简单,比如嵌套文档的情况等。相信你看了我这篇文章之后你会感叹原来统计文档有这么多讲究啊。

    用户7634691
  • maven导入jar包到本地仓库

    很多时候通过maven来远程下载jar包,由于网速或者仓库地址问题导致下载失败或者非常缓慢。还有一种情况是,我们用的一些三方jar包,中央仓库并没有,比如一些开...

    用户7634691
  • 安全框架shiro入门示例

    seq是流水号,每笔订单都不一样。username是固定admin,我用它来辨识身份。HMAC是密钥相关的哈希运算消息认证码,HMAC运算利用哈希算法,以一个密...

    用户7634691
  • C# 基元类型

    C#编程中,初始化一个整数有两种方式: (1)、较繁琐的方法,代码如下: Int32 a = new Int32(); (2)、极简的方法,代码如下: int ...

    郑小超.
  • 算法提高 概率计算

    问题描述   生成n个∈[a,b]的随机整数,输出它们的和为x的概率。 输入格式   一行输入四个整数依次为n,a,b,x,用空格分隔。 输出格...

    AI那点小事
  • 关于面试,你是如何面对的呢?

    面试,对于职场的人来说每个人都必须经历的。那作为职场人,看到很多案例,总结的不太笼统,那么该如何面对经常问的话呢?该如何回答比较好呢?以下是通过多个事例进行汇总...

    用户6367961
  • Maven依赖本地jar包,上传第三方jar包

    前言:maven管理项目,经常用到中央仓库没有的第三方jar包,需要将本地的jar包发布到私有库供项目使用。

    王念博客
  • 更巧妙的表单设计与登陆访问

    以下内容由Mockplus团队翻译整理,仅供学习交流,Mockplus是更快更简单的原型设计工具 你觉得一个普通人每天会使用多少次登陆功能呢?数据显示至少1...

    奔跑的小鹿
  • FLV文件格式官方规范详解

    ——如果要学习一个新的知识点,官方手册可能是最快的途径。查看网上其他人的总结也许入门更快,但是要准确,深入,完整,还是要看官方手册。 以下内容来自对官方文档Vi...

    _gongluck
  • 基于区块链的社交网络Sapien在两小时内筹集到1100万

    Sapien是一个去中心化的社交网络和新闻平台,这个平台能够奖励内容作者。它在2018年1月31日到2018年2月15日的售前展览中以惊人的速度筹集了1100万...

    Hans He

扫码关注云+社区

领取腾讯云代金券