专栏首页WriteOnReadJDK源码分析-DelayQueue

JDK源码分析-DelayQueue

概述

DelayQueue 也是一种队列,它内部的元素有“延迟”,也就是当从队列中获取元素时,如果它的延迟时间未到,则无法取出。

DelayQueue 的类签名和继承结构如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {}

下面分析其代码实现。

代码分析

相关接口

DelayQueue 中的元素要实现 Delayed 接口,该接口定义如下:

public interface Delayed extends Comparable<Delayed> {
    /**     
     * 以给定的时间单位,返回该对象的剩余延迟
     * 若为零或者负数表示延时已经过去    
     */    
     long getDelay(TimeUnit unit);
 }

Delayed 接口继承自 Comparable 接口,而它本身只定义了一个 getDelay 方法,该方法的作用是获取对象的剩余延迟时间。

Comparable 接口也只有一个 compareTo 方法:

public interface Comparable<T> {
    public int compareTo(T o);
}

这里不再详述。

构造器

DelayQueue 有两个构造器,如下:

// 无参构造器
public DelayQueue() {}

// 指定集合的构造器
public DelayQueue(Collection<? extends E> c) {
    // 该方法最后是通过 add 方法实现的,后文进行分析    
    this.addAll(c);
}

成员变量

// 锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();

// 优先队列,实际存储元素的地方
private final PriorityQueue<E> q = new PriorityQueue<E>();

// 线程等待的标识
private Thread leader = null;

// 触发条件,表示是否可以从队列中读取元素
private final Condition available = lock.newCondition();

入队方法

DelayQueue 也是一个队列,它的入队方法有:add(E), offer(E), put(E) 等,它们的定义如下:

public boolean add(E e) {
    return offer(e);
}
public void put(E e) {
    offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

这几个方法都是通过 offer(E) 方法实现的,它的代码如下:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 入队        
        q.offer(e);       
        // 若该元素为队列头部元素,唤醒等待的线程        
        // (表示可以从队列中读取数据了)        
        if (q.peek() == e) {
            leader = null;           
            available.signal();        
        }        
        return true;    
    } 
    finally {
        lock.unlock();    
    }
}

出队方法

有入队自然也有出队,主要方法有:poll(), take(), poll(timeout, unit), 如下:

public E poll() {
    final ReentrantLock lock = this.lock;    
    lock.lock();    
    try {    
        // 获取队列头部元素        
        E first = q.peek();        
        // 头部元素为空,或者延时未到,则返回空        
        if (first == null || first.getDelay(NANOSECONDS) > 0)            
        return null;        
        // 否则返回头部元素        
        else            
        return q.poll();    
    } finally {    
        lock.unlock();    
    }
}

poll 方法是非阻塞的,即调用之后无论元素是否存在都会立即返回。下面看下阻塞的 take 方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 以可中断方式获取锁
    lock.lockInterruptibly();
    try {
        // 无限循环
        for (;;) {
            // 获取队列头部元素
            E first = q.peek();
            // 若为空,则等待
            if (first == null)
                available.await();
            // 若不为空
            else {
                // 获取延迟的纳秒数,若小于等于零(即过期),则获取并删除头部元素
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                // 执行到这里,表示 delay>0,也就是延时未过期
                first = null; // don't retain ref while waiting
                // leader 不为空表示有其他线程在读取数据,当前线程等待
                if (leader != null)
                    available.await();
                else {
                    // 将当前线程设置为 leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待延迟时间过期
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 唤醒该条件下的其他线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

1. 获取队列头部元素;

1.1 若该元素为空(队列为空),则当前线程等待;

1.2 若该元素不为空,且已经过期,则取出该元素(并移除);

1.2.1 若未过期,且有其他线程在操作(leader 不为空),当前线程等待;

1.2.2 若未过期,且没有其他线程操作,则占有“操作权”(将 leader 设置为当前线程),并等待延迟过期。

以上操作循环执行。

take 方法是阻塞操作,当条件不满足时会一直等待。另一个 poll(timeout, unit) 方法和它有些类似,只不过带有延时,如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 以可中断方式获取锁
        lock.lockInterruptibly();
        try {
            // 无限循环
            for (;;) {
                // 获取队列的头部元素
                E first = q.peek();
                // 若头部元素为空(即队列为空),当超时时间大于零则等待相应的时间;
                //   否则(即超时时间小于等于零)返回空
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    // 执行到这里表示队列头部元素不为空
                    // 获取剩余延时
                    long delay = first.getDelay(NANOSECONDS);
                    // 延时已过期,返回队列头部元素
                    if (delay <= 0)
                        return q.poll();
                    // 延时未过期且等待超时,返回空
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    // 延时未过期且等待未超时,且等待超时<延迟时间
                    // 表示有其他线程在取数据,则当前线程进入等待
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        // 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            // 计算剩余延迟时间
                            nanos -= delay - timeLeft;
                        } finally {
                            // 该线程操作完毕,把 leader 置空
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 唤醒 available 条件下的一个其他线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

take 和 poll 方法还有一个区别:当延迟未过期时,take 方法会一直等待,而 poll 方法则会返回空。

此外还有一个 peek 方法,该方法虽然也能获取队列头部的元素,但与以上出队方法不同的是,peek 方法只是读取队列头部元素,并不会将其删除:

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 返回队列的头部元素(不删除)
        return q.peek();
    } finally {
        lock.unlock();
    }
}

以上就是 DelayQueue 的主要方法的代码分析,为便于理解,下面简要举例分析。

用法举例

示例代码:

自定义一个实现了 Delayed 接口的 Task 类,并将它的几个对象添加到一个延迟队列中,代码如下:

public class TestDelayedQueue {
    public static void main(String[] args) throws Exception {
        BlockingQueue<Task> delayQueue = new DelayQueue<>();
        long now = System.currentTimeMillis();
        delayQueue.put(new Task("c", now + 6000));        
        delayQueue.put(new Task("d", now + 10000));
        delayQueue.put(new Task("a", now + 3000));
        delayQueue.put(new Task("b", now + 4000));
        
        while (true) {
            System.out.println(delayQueue.take());
            TimeUnit.SECONDS.sleep(1);
        }
    }

    private static class Task implements Delayed {
        private String taskName;
        private long endTime;

        public Task(String taskName, long endTime) {
            this.taskName = taskName;
            this.endTime = endTime;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }

        @Override
        public String toString() {
            return "taskName-->" + taskName;
        }
    }
}

结果会以延迟时间的顺序取出各个元素。

小结

1. DelayQueue 是一种队列,同时实现了 BlockingQueue 接口;

2. 它内部的元素有延迟时间的概念,出队时,若延时未到,则无法读取到队列头部的元素;

3. 它是线程安全的。

本文分享自微信公众号 - WriteOnRead(WriteOnRead),作者:jaxer

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • JDK源码分析-ArrayBlockingQueue

    前文「JDK源码分析-BlockingQueue」简要分析了 BlockingQueue 接口的主要方法,ArrayBlockingQueue 就是该接口的一个...

    WriteOnRead
  • JDK源码分析-CopyOnWriteArrayList

    CopyOnWriteArrayList,从名字可以看出它跟 ArrayList 有点关系,可以理解为线程安全的 ArrayList。它的类签名和继承结构如下:

    WriteOnRead
  • JDK源码分析-ThreadPoolExecutor

    ThreadPoolExecutor 是 JDK 中线程池的实现类,它的继承结构如下:

    WriteOnRead
  • Android多线程之同步锁的使用

    对于Java来说,最常用的同步机制就是synchronized关键字,他是一种基于语言的粗略锁,能够作用于对象、函数、class。每个对象都只有一个锁,谁能够拿...

    砸漏
  • 学习Lock中Condition的使用

    有java开发经验的朋友肯定知道Lock,不管是培训班还是自学,大家知道Lock代替的是synchronized关键字。那么Condition又是在哪里使用呢?

    zhangheng
  • Java并发编程(06):Lock机制下API用法详解

    Lock加锁相关结构中涉及两个使用广泛的基础API:ReentrantLock类和Condition接口,基本关系如下:

    知了一笑
  • 吃透这份pdf,面试阿里、腾讯、百度等一线大厂,顺利拿下心仪offer!

    最近一位年前裸辞的朋友来找我诉苦,说因为疫情原因现在都在家吃老本。本想着年后就来找工作的,但是现在这个情况也不好找,而且很多公司也随着这次疫情面临着资金紧缺导致...

    程序员追风
  • C语言---头文件

    今天说一个我在工作时候发现的一个细节,可能大家都已经清楚的知道了,我就在这里记录一下吧。 不想看过程的直接去文末看结论吧。

    HeaiKun
  • 装饰者模式(动态组合)

    如果您是第一次阅读我的设计模式系列文章,建议先阅读设计模式开篇,希望能得到您宝贵的建议。

    幺鹿
  • “J.U.C”:Semaphore

    信号量Semaphore是一个控制访问多个共享资源的计数器,它本质上是一个“共享锁”。 Java并发提供了两种加锁模式:共享锁和独占锁。前面LZ介绍的Reent...

    企鹅号小编

扫码关注云+社区

领取腾讯云代金券