在使用Redis时候,存放的元素有过期一说,当过期后,就不能被取出来了。当然实现思路上比较容易理解,设定一个过期时间即可。当然在Java语言中,也有这样的类似的过期功能,它就是DelayQueue。
主要有以下几方面用途:
关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
What is DelayQueue
DelayQueue里面实际上用的是PriorityBlockingQueue来实现,具体可以看:
Java并发学习(二十四)-PriorityBlockingQueue分析
DelayQueue有以下使用特点:
DelayQueue是一个支持延时获取元素的无界阻塞队列。
里面存储的元素,都是有超时时间的(expired time)
如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说,你只能拿到已过期(expired)的元素。
而里面的实现呢,可以理解为下面的:
DelayQueue = PriorityQueue + Delayed
先看看它的定义:
/**
* 没有界限的。
* 有延迟的元素,只有当元素过期时候,才能被获取到。
* 不允许null,
* take和poll操作,对没有过期的元素不起作用,
* size操作既统计expired也统计unexpired元素。
* Iterator不保证顺序性。
*/
public class DelayQueue extends AbstractQueue
implements BlockingQueue {
private final transient ReentrantLock lock = new ReentrantLock(); //使用reentrantlock。
private final PriorityQueue q = new PriorityQueue(); //用PriorityBlockingQueue。
/**
* 用来表示当前,等待头节点的这个线程。
*
* 当一个thread编程leader时候,她只等待下一个即将expired的节点。 但是其他线程等待时间长短并不知道。
* 这个leader一定要在poll或者take之前signal
*/
private Thread leader = null;
//等待队列。也就是大家排队去,分别获取这个元素。
private final Condition available = lock.newCondition();
...
}
上文定义中,首先需要注意的是DelayQueue里面装的东西,这个后文分析。其次,这里面并没有很多东西,
有一个leader变量,它用来标识这个leader线程正在等待下一个即将expired的节点。
但是其他的处于等待中的线程等待时间长短并不知道。
Delayed
在DelayQueue中,所放入的东西必须是实现了Delayed接口的类,看看它的定义:
public interface Delayed extends Comparable {
long getDelay(TimeUnit unit);
}
Delayed接口很简单,就一个getDelay方法,这个方法会返回一个long类型的值,我们可以通过这个long类型
值来判定其是否过期(expired)。
下面结合几个具体方法看:
add操作
看add操作:
public boolean add(E e) {
return offer(e);
}
再看offer方法:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁。
try {
q.offer(e); //调用priorityBlockingqueue进行添加。
if (q.peek() == e) { //判断e是不是队头元素。
//如果是,刚放进去,肯定没人等,所以leader设为null
leader = null;
available.signal();
}
//否则直接加入即可
return true;
} finally {
lock.unlock(); //解锁。
}
}
add操作还是比较简单的,调用PriorityBlockingQueue进行操作,并且对leader变量进行相应判断即可。
poll操作
里面出队操作有两个,一个可能返回null,另一个就能够保证一定会有东西才能返回。
下面先看poll操作:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
E first = q.peek(); //获取第一个
if (first == null || first.getDelay(NANOSECONDS) > 0) //判断是否超时。
//如果first为null,或者也并未超时,那么返回null
return null;
else
return q.poll(); //正常返回,说明超时可以拿出
} finally {
lock.unlock(); //解锁。
}
}
思路还是比较好理解的,通过getDelay判断是否超时。
take操作
take操作,它就一定保证能够拿出来,否则会一直阻塞下去,下面看代码:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //可以中断的加锁
try {
for (;;) { //自旋。只有拿到了,才返回。
E first = q.peek();
if (first == null)
available.await(); //first为null,则等待。
else {
long delay = first.getDelay(NANOSECONDS);
if (delay
//因为加锁了,所以一定有值
return q.poll(); //第一个已经过期,则可以弹出。
first = null; // don't retain ref while waiting //不可以弹出。
if (leader != null)
//leader不为null,那么就继续等待吧,因为有别的线程leader在等待了
available.await();
else { //说明没人等,就我先来我先说我在等待,把当前线程设为leader。
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); //把当前线程放入available队列里面等待。
} finally {
if (leader == thisThread) //最后拿到了,将leader置null
leader = null;
}
}
}
}
} finally {
//return之前,走这一步,signal一个
if (leader == null && q.peek() != null) //检测leader,如果leader为null,则signal一个。
available.signal();
lock.unlock();
}
}
由于里面的数据结构主要是使用PriorityBlockingQueue的,使用了装饰器模式,所以一些Iterator等,
和PriorityBlockingQueue也大体相似。
文章开头提到了redis,但是事实上,DelayQueue和Redis里面的超时并不痛,这里面只是超时了你才可以拿出来(当然可以自己设计getDelay方法)。另外,里面元素也只有拿出来后才会被删除,否则超时并不会被删除。
领取专属 10元无门槛券
私享最新 技术干货