首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

并发编程之DelayQueue

在使用Redis时候,存放的元素有过期一说,当过期后,就不能被取出来了。当然实现思路上比较容易理解,设定一个过期时间即可。当然在Java语言中,也有这样的类似的过期功能,它就是DelayQueue。

主要有以下几方面用途:

关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

What is DelayQueue

DelayQueue里面实际上用的是PriorityBlockingQueue来实现,具体可以看:

Java并发学习(二十四)-PriorityBlockingQueue分析

DelayQueue有以下使用特点:

DelayQueue是一个支持延时获取元素的无界阻塞队列。

里面存储的元素,都是有超时时间的(expired time)

如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说,你只能拿到已过期(expired)的元素。

而里面的实现呢,可以理解为下面的:

DelayQueue = PriorityQueue + Delayed

先看看它的定义:

/**

* 没有界限的。

* 有延迟的元素,只有当元素过期时候,才能被获取到。

* 不允许null,

* take和poll操作,对没有过期的元素不起作用,

* size操作既统计expired也统计unexpired元素。

* Iterator不保证顺序性。

*/

public class DelayQueue extends AbstractQueue

implements BlockingQueue {

private final transient ReentrantLock lock = new ReentrantLock(); //使用reentrantlock。

private final PriorityQueue q = new PriorityQueue(); //用PriorityBlockingQueue。

/**

* 用来表示当前,等待头节点的这个线程。

*

* 当一个thread编程leader时候,她只等待下一个即将expired的节点。 但是其他线程等待时间长短并不知道。

* 这个leader一定要在poll或者take之前signal

*/

private Thread leader = null;

//等待队列。也就是大家排队去,分别获取这个元素。

private final Condition available = lock.newCondition();

...

}

上文定义中,首先需要注意的是DelayQueue里面装的东西,这个后文分析。其次,这里面并没有很多东西,

有一个leader变量,它用来标识这个leader线程正在等待下一个即将expired的节点。

但是其他的处于等待中的线程等待时间长短并不知道。

Delayed

在DelayQueue中,所放入的东西必须是实现了Delayed接口的类,看看它的定义:

public interface Delayed extends Comparable {

long getDelay(TimeUnit unit);

}

Delayed接口很简单,就一个getDelay方法,这个方法会返回一个long类型的值,我们可以通过这个long类型

值来判定其是否过期(expired)。

下面结合几个具体方法看:

add操作

看add操作:

public boolean add(E e) {

return offer(e);

}

再看offer方法:

public boolean offer(E e) {

final ReentrantLock lock = this.lock;

lock.lock(); //加锁。

try {

q.offer(e); //调用priorityBlockingqueue进行添加。

if (q.peek() == e) { //判断e是不是队头元素。

//如果是,刚放进去,肯定没人等,所以leader设为null

leader = null;

available.signal();

}

//否则直接加入即可

return true;

} finally {

lock.unlock(); //解锁。

}

}

add操作还是比较简单的,调用PriorityBlockingQueue进行操作,并且对leader变量进行相应判断即可。

poll操作

里面出队操作有两个,一个可能返回null,另一个就能够保证一定会有东西才能返回。

下面先看poll操作:

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock(); //加锁

try {

E first = q.peek(); //获取第一个

if (first == null || first.getDelay(NANOSECONDS) > 0) //判断是否超时。

//如果first为null,或者也并未超时,那么返回null

return null;

else

return q.poll(); //正常返回,说明超时可以拿出

} finally {

lock.unlock(); //解锁。

}

}

思路还是比较好理解的,通过getDelay判断是否超时。

take操作

take操作,它就一定保证能够拿出来,否则会一直阻塞下去,下面看代码:

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly(); //可以中断的加锁

try {

for (;;) { //自旋。只有拿到了,才返回。

E first = q.peek();

if (first == null)

available.await(); //first为null,则等待。

else {

long delay = first.getDelay(NANOSECONDS);

if (delay

//因为加锁了,所以一定有值

return q.poll(); //第一个已经过期,则可以弹出。

first = null; // don't retain ref while waiting //不可以弹出。

if (leader != null)

//leader不为null,那么就继续等待吧,因为有别的线程leader在等待了

available.await();

else { //说明没人等,就我先来我先说我在等待,把当前线程设为leader。

Thread thisThread = Thread.currentThread();

leader = thisThread;

try {

available.awaitNanos(delay); //把当前线程放入available队列里面等待。

} finally {

if (leader == thisThread) //最后拿到了,将leader置null

leader = null;

}

}

}

}

} finally {

//return之前,走这一步,signal一个

if (leader == null && q.peek() != null) //检测leader,如果leader为null,则signal一个。

available.signal();

lock.unlock();

}

}

由于里面的数据结构主要是使用PriorityBlockingQueue的,使用了装饰器模式,所以一些Iterator等,

和PriorityBlockingQueue也大体相似。

文章开头提到了redis,但是事实上,DelayQueue和Redis里面的超时并不痛,这里面只是超时了你才可以拿出来(当然可以自己设计getDelay方法)。另外,里面元素也只有拿出来后才会被删除,否则超时并不会被删除。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190210G0BCMQ00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券