前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DelayQueue 源码分析

DelayQueue 源码分析

作者头像
itliusir
发布2020-01-06 11:29:58
5820
发布2020-01-06 11:29:58
举报
文章被收录于专栏:刘君君刘君君

摘要:

  1. DelayQueue 的应用场景
  2. DelayQueue 实现原理

TOP 带着问题看源码

  1. DelayQueue 的应用场景
  2. DelayQueue 实现原理

1. 基本介绍

我们先来看一下它的实现类图,它实现了 Delayed、BlockingQueue 接口和 AbstractQueue 基础类,从实现的功能上看,它首先是一个阻塞队列,然后 Delayed 接口是标记给定延迟后执行的对象,结合类名也可以大致的分析出:DelayQueue 是一个 延时阻塞 队列

2. 成员变量分析

代码语言:javascript
复制
// 保证线程安全的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 标记取元素时是否有线程在排队
private Thread leader = null;
// 是否可取的条件变量
private final Condition available = lock.newCondition();

3. 核心方法分析

3.1 入队操作

3.1.1 offer(E e)

入队逻辑很简单:

  1. 把数据加入到优先队列里
  2. 如果添加的元素是堆顶元素 2.1 leader 置空 2.2 唤醒 “可取” 条件队列的线程
代码语言:javascript
复制
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

3.2 出队操作

3.2.1 poll()

出队非阻塞API,核心逻辑就是:

  1. 检查堆顶元素,如果为空或者还没到期呢,返回 null
  2. 否则返回取出堆顶元素
代码语言:javascript
复制
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
3.2.2 take()

出队阻塞API,核心逻辑就是:

  1. 检查堆顶元素,如果为空就等待添加元素时候被唤醒重试
  2. 如果不为空,并且已经过期就直接取出来,没过期并且前面没有线程等待,就等待超时时间后唤醒重试
  3. 每次取完都会唤醒 “可取” 条件队列的线程
代码语言:javascript
复制
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
          	// 堆顶元素
            E first = q.peek();
          	// 为空,等待被唤醒
            if (first == null)
                available.await();
          	// 堆顶不为空
            else {
              	// 获取元素的超时时间
                long delay = first.getDelay(NANOSECONDS);
              	// 已过期就取走
                if (delay <= 0)
                    return q.poll();
              	// 没过期往下走
              	// 这里设置为 null 是放在还有别的线程持有没有释放导致内存泄漏
                first = null; // don't retain ref while waiting
              	// 校验是否有等待线程,有就等待leader线程取完或有新加入的元素唤醒它
                if (leader != null)
                    available.await();
              	// 没有等待线程,就把自己设置为等待线程,然后等待超时时间唤醒重试
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                      	// 唤醒后就把 leader 置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
      	// 没有等待线程并且队列还有数据,就唤醒下一个线程来取
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

回到问题 TOP 2 ,通过对入队列和出队列的分析,其实现原理想必已经明白,就是在队列的基础上增加了时间维度的优先级,然后通过锁和条件变量来控制取/放流程。

4. 总结

和我们开头分析的一样,DelayQueue是一个 阻塞延时且无界 的队列,它使用的是优先级队列+时间维度来实现。回到问题 TOP 1 延时队列场景主要适用于定时任务,但是对于内存中的延时队列往往不能用于重要的业务场景(毕竟还是内存队列,宕机了就没咯),所以可以应用于一些基础类库,不太重要的业务定时清理和处理等。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-01-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TOP 带着问题看源码
  • 1. 基本介绍
  • 2. 成员变量分析
  • 3. 核心方法分析
    • 3.1 入队操作
      • 3.1.1 offer(E e)
    • 3.2 出队操作
      • 3.2.1 poll()
      • 3.2.2 take()
  • 4. 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档