专栏首页Java编程技术并发队列-无界阻塞延迟队列DelayQueue原理探究

并发队列-无界阻塞延迟队列DelayQueue原理探究

一、前言

DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。

二、 DelayQueue类图结构

image.png

如图DelayQueue中内部使用的是PriorityQueue存放数据,使用ReentrantLock实现线程同步,可知是阻塞队列。另外队列里面的元素要实现Delayed接口,一个是获取当前剩余时间的接口,一个是元素比较的接口,因为这个是有优先级的队列。

三、offer操作

插入元素到队列,主要插入元素要实现Delayed接口。

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {(2)
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

首先获取独占锁,然后添加元素到优先级队列,由于q是优先级队列,所以添加元素后,peek并不一定是当前添加的元素,如果(2)为true,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的线程,通知他们队列里面有元素了。

四、take操作

获取并移除队列首元素,如果队列没有过期元素则等待。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //获取但不移除队首元素(1)
                E first = q.peek();
                if (first == null)
                    available.await();//(2)
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)//(3)
                        return q.poll();
                    else if (leader != null)//(4)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;//(5)
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)//(6)
                available.signal();
            lock.unlock();
        }
    }

第一次调用take时候由于队列空,所以调用(2)把当前线程放入available的条件队列等待,当执行offer并且添加的元素就是队首元素时候就会通知最先等待的线程激活,循环重新获取队首元素,这时候first假如不空,则调用getdelay方法看该元素海剩下多少时间就过期了,如果delay<=0则说明已经过期,则直接出队返回。否者看leader是否为null,不为null则说明是其他线程也在执行take则把该线程放入条件队列,否者是当前线程执行的take方法,则调用(5)await直到剩余过期时间到(这期间该线程会释放锁,所以其他线程可以offer添加元素,也可以take阻塞自己),剩余过期时间到后,该线程会重新竞争得到锁,重新进入循环。

(6)说明当前take返回了元素,如果当前队列还有元素则调用singal激活条件队列里面可能有的等待线程。leader那么为null,那么是第一次调用take获取过期元素的线程,第一次调用的线程调用设置等待时间的await方法等待数据过期,后面调用take的线程则调用await直到signal。

五、poll操作

获取并移除队头过期元素,否者返回null

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
           //如果队列为空,或者不为空但是队头元素没有过期则返回null
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

六、一个例子

class DelayedEle implements Delayed {

    private final long delayTime; //延迟时间
    private final long expire;  //到期时间
    private String data;   //数据

    public DelayedEle(long delay, String data) {
        delayTime = delay;
        this.data = data;
        expire = System.currentTimeMillis() + delay; 
    }

    /**
     * 剩余时间=到期时间-当前时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 优先队列里面优先级规则
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delayTime);
        sb.append(", expire=").append(expire);
        sb.append(", data='").append(data).append('\'');
        sb.append('}');
        return sb.toString();
    }
}
public static void main(String[] args) {
       

        DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();

    DelayedEle element1 = new DelayedEle(1000,"zlx");
    DelayedEle element2 = new DelayedEle(1000,"gh");

    delayQueue.offer(element1);
    delayQueue.offer(element2);

    element1 =  delayQueue.take();
    System.out.println(element1);


}

七、使用场景

  • 一个典型场景是,重试机制实现,比如当调用接口失败后,把当前调用信息放入delay=10s的元素,然后把元素放入队列,那么这个队列就是一个重试队列,一个线程通过take方法获取需要重试的接口,take返回则接口进行重试,失败则再次放入队列,同时也可以在元素加上重试次数。
  • TimerQueue的内部实现
  • ScheduledThreadPoolExecutor中DelayedWorkQueue是对其的优化使用

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 并发队列-有界阻塞队列ArrayBlockingQueue原理探究

    上节介绍了无界链表方式的阻塞队列LinkedBlockingQueue,本节来研究下有界使用数组方式实现的阻塞队列ArrayBlockingQueue

    加多
  • 一个有关定时生产与消费的问题

    按照上面的逻辑看的话,每个队列里面最多有一个元素。其实不然,因为在多线程模型中每个线程占用cpu执行的时间是按照时间片来划分的,每个线程执行完自己的时间片后会被...

    加多
  • 并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究

    PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现,研究过数组方式存放最小堆节点的都知道,...

    加多
  • 聊聊 JDK 阻塞队列源码(ReentrantLock实现)

    项目中用到了一个叫做 Disruptor 的队列,今天楼主并不是要介绍 Disruptor 而是想巩固一下基础扒一下 JDK 中的阻塞队列,听到队列相信大家对其...

    haifeiWu
  • ArrayBlockingQueue 源码分析

    其实现了阻塞队列 BlockingQueue 接口和基本队列操作 AbstractQueue 接口

    itliusir
  • Bagging与随机森林算法原理小结

        在集成学习原理小结中,我们讲到了集成学习有两个流派,一个是boosting派系,它的特点是各个弱学习器之间有依赖关系。另一种是bagging流派,它的特...

    刘建平Pinard
  • 机器学习(24)之Bagging与随机森林

    关键字全网搜索最新排名 【机器学习算法】:排名第一 【机器学习】:排名第一 【Python】:排名第三 【算法】:排名第四 前言 在(机器学习(17)之集成学习...

    昱良
  • Bagging

    用户1332428
  • Java并发之高级自旋锁CLH锁和MCS锁

    自旋锁(spin lock)是一个典型的对临界资源的互斥手段,自旋锁是基于CAS原语的,所以它是轻量级的同步操作,它的名称来源于它的特性。自旋锁是指当一个线程尝...

    我是攻城师
  • mysql导入hive的NULL值处理方案

    目前提供两种方法解决数据库中的字段值为NULl导入到HIVE中后变成空字符串的方法,使用以下方法可以保障在mysql中存储的是NULL,导入到HIVE表后也是N...

    袁宋

扫码关注云+社区

领取腾讯云代金券