1.什么是延迟队列
在java的并发包中有有关定时调度的api。 里边其中一个重要实现就是延迟队列,通过延时队列来实现定时调度。
那么如果让你实现一个延时队列,你会怎么做呢?
2.自己实现一个延迟队列
2.1.定义一个Delayed接口。
2.2.定义一个DelayQueue。
2.2.1.继承AbstractQueue
2.2.2.实现BlockingQueue
2.2.2.使用PriorityQueue来装载任务
2.2.3.使用重入锁ReentrantLock来存取操作的线程安全
2.2.4.创建condition,用来唤醒和挂起线程
2.2.5.核心方法take的实现
2.2.5.1.通过getDelay来判断是否到期
2.2.5.2.Leader-Follower模式的使用
2.1.定义一个Delayed接口
新建一个Delayed接口。这个接口主要是为了限制让自定义的task都去实现这个接口。因为我们要通过这个方法来获取
还有多久过期。
/**
* 延时接口
* @author hezhuofan
*/
public interface Delayed extends Comparable<Delayed> {
/**
* 返回剩余的延迟时间
* @param unit 时间单位
* @return 剩余的时间; 零值或负值表示早已过期
*/
long getDelay(TimeUnit unit);
}
2.2.定义一个DelayQueue。
2.2.1.继承AbstractQueue
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> {
}
AbstractQueue
这个类提供了一些Queue操作的骨架实现。 当你需要实现一个元素不能为null的队列时,就可以继承这个类。 方法add,remove和element都是基于offer,poll和peek,但是会抛出异常而不是通过返回false或null来表示失败。
扩展此类的Queue实现,必须最小化的去定义Queue的offer方法,该方法不允许插入null元素。另外这几个方法Queue.peek,Queue.poll,Collection.size和Collection.iterator也是一样的。 通常情况下,其他方法也将被覆盖。 如果你要实现的队列不能满足这些限制,那么请考虑更上一层的AbstractCollection来实现你的队列。
该类是Java Collections Framework的成员。
另外值得注意的是,这里我们让动态类型必须是实现了Delayed接口的,因为只有实现了Delayed接口的任务才能获取到定时的剩余时间,才能判断什么时候把到期的任务弹出。
2.2.2.实现BlockingQueue
为了让我们的队列是一个阻塞队列,我们需要实现接口BlockingQueue。
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
}
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
1. 当队列满了的时候进行入队列操作
2. 当队列空了的时候进行出队列操作
因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另一个线程进行了入队列操作。
在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。
阻塞队列主要用在生产者/消费者的场景,下面这幅图展示了一个线程生产、一个线程消费的场景:
2.2.2.使用PriorityQueue来装载任务
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private PriorityQueue<E> q=new PriorityQueue<E>();
}
我们知道队列是遵循先进先出(First-In-First-Out)模式的,但有些时候需要在队列中基于优先级处理对象。举个例子,比方说我们有一个每日交易时段生成股票报告的应用程序,需要处理大量数据并且花费很多处理时间。客户向这个应用程序发送请求时,实际上就进入了队列。我们需要首先处理优先客户再处理普通用户。在这种情况下,Java的PriorityQueue(优先队列)会很有帮助。
PriorityQueue类在Java1.5中引入并作为 Java Collections Framework 的一部分。PriorityQueue是基于优先堆的一个无界队列,这个优先队列中的元素可以默认自然排序或者通过提供的Comparator(比较器)在队列实例化的时排序。
2.2.3.使用重入锁ReentrantLock来保证存取操作的线程安全
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock=new ReentrantLock();
private PriorityQueue<E> q=new PriorityQueue<E>();
}
重入锁不多说了,这是一个用的最广泛的锁之一了。
2.2.4.创建condition,用来唤醒和挂起线程
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock=new ReentrantLock();
private PriorityQueue<E> q=new PriorityQueue<E>();
/**
*用于线程的等待以及唤醒的触发,稍后会详细介绍Leader-Follower模式
*/
private Thread leader = null;
private final Condition available = lock.newCondition();
}
这里我们使用Condition来辅助一些后面将要介绍到的Leader-Follower模式。
Condition的基本动作就是负责唤醒和挂起线程。
2.2.5.核心方法take的实现
队列的put offer等方法都比较简单,就是赤裸裸的往队列放入任务。延时队列最核心的实现只有一个方法就是take方法,接下来就来看看吧,请边看代码,边看注释:
@Override public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //每个方法都持有一个lock lock.lockInterruptibly();//上锁,如果当前线程被中断,那么抛出异常无法获取锁 try { for (;;) {//循环检测 E first = q.peek(); //去获取优先队列中index为0的对象(在优先队列中是通过数组实现) if (first == null) {//如果没有拿到,那么就让该线程等待 System.out.println("awaiting"); available.await();//该线程一直等待,直到被signal唤醒或interrupted }else {//如果第一个元素拿到了 /** * 那么就去获取到第一个元素所设置的延迟时间。 * 我们队列中的每个元素都是实现了Delayed接口的,所以是可以拿到元素自定义的getDelay的延迟时间的 */ long delay = first.getDelay(NANOSECONDS); /** * 如果延迟为小于等于0,那么就意味着这个任务到点了,要被执行了,于是就弹出该任务(元素)。 */ if (delay <= 0) //弹出该元素,然后返回(得到,然后从队列中删除掉),本次take执行完成。 return q.poll(); /** * 如果delay大于0,说明该任务还没到时间点 */ first = null; //那么就把这个first给置为null /** * 以下是有关leader的逻辑,以下的逻辑借鉴了Leader-Follower模式。 * * 基本逻辑是: * * 1、如果已经有leader了,那么就让当前线程挂起。 * 2、如果还没有leader,那么就把当前线程选为leader。 * 3、然后让当前线程等待delay等长时间。 */ if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread();//当前线程 leader = thisThread; //把当前线程设置为leader try { available.awaitNanos(delay);//等待指定delay时间 } finally { if (leader == thisThread)//如果当前线程就是leader leader = null;//那么就把leader置为null } } } } } finally { if (leader == null && q.peek() != null){//如果现在没有leader,并且队列中已经有任务了 System.out.println("激活线程"); available.signal();}//那么就唤醒睡眠的线程 lock.unlock(); } }
2.2.5.1.通过getDelay来判断是否到期
/**
* 那么就去获取到第一个元素所设置的延迟时间。
* 我们队列中的每个元素都是实现了Delayed接口的,所以是可以拿到元素自定义的getDelay的延迟时间的
*/
long delay = first.getDelay(NANOSECONDS);
/**
* 如果延迟为小于等于0,那么就意味着这个任务到点了,要被执行了,于是就弹出该任务(元素)。
*/
if (delay <= 0)
//弹出该元素,然后返回(得到,然后从队列中删除掉),本次take执行完成。
return q.poll();
2.2.5.2.Leader-Follower模式的使用
上面的 take方法中就借鉴了Leader-Follower模式,通过该模式来实现线程之间的切换。以下是有关该模式的一些介绍。
Leader/Follower相比较于普通的ThreadPool的优点:
1、无需context switch,减少了线程间数据copy。
2、无需维护一个队列,占用而外的内存空间。
它们通过以下两个方法来实现切换:
waitToLeader
promoteNewLeader
利用了java线程间通信的特性, 实现leader/follower线程的切换。
目前java实现线程间通信,有两种方法:
1、Object.wait, Object.notify
2、Condition.await, Condition.signal
正如你可能已经读过的,该模式由4个组件组成:ThreadPool,HandleSet,Handle,ConcreteEventHandler(实现EventHandler接口)。我们的延时队列的take操作正是借鉴了这种模式。怎么理解Leader-Follower呢?
你可以把它想象成一个夜晚的出租车站,所有的司机都在睡觉,除了一个领导。 ThreadPool是一个管理多个线程的工作站 - 出租车(线程)。
领导者正在等待HandleSet上的IO事件,就像司机等待客户一样。
当一个客户到达(以识别IO事件的句柄的形式)时,领导司机唤醒另一个司机成为下一个领导,然后就去服务他的乘客的请求了,也就是去送那个客户到指定的地方。
当他把客户送到给定的地址(呼叫ConcreteEventHandler并交给Handle)时,下一个领导可以同时为另一个乘客服务。
当司机结束时,他把计程车开回车站,如果车站不空,就睡着了。否则他会成为领导者。
每个线程就好比出租车司机,threadpool好比出租车总站,event好比出租车客户(thead - cab driver, threadpool - station, event - client)。
3.演示
3.1.实现一个基于Delayed的任务
/**
* <p>
* [任务调度系统]
* <br>
* [队列中要执行的任务]
* </p>
*/
public class Task<T extends Runnable> implements Delayed {//实现Delayed接口
/**
* 到期时间
*/
private final long time;
/**
* 问题对象
*/
private final T task;
private static final AtomicLong atomic = new AtomicLong(0);
private final long n;
public Task(long timeout, T t) {
this.time = System.nanoTime() + timeout;
this.task = t;
this.n = atomic.getAndIncrement();
}
/**
* 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
*
* 这个方法会在CDelayQueue中被调用,通过对该方法的返回值的判断,来决定一个定时任务是否到时
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
/**
* 实现此方法的目的是,该任务最终会被装载入CDelayQueue里的优先队列(PriorityQueue)中,而优先队列
* 之所以叫优先队列,就是通过此方法来比对同等条件下哪些任务被优先弹出。
* @param other
* @return
*/
@Override
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof Task) {
Task x = (Task) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (n < x.n)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
public T getTask() {
return this.task;
}
@Override
public int hashCode() {
return task.hashCode();
}
@Override
public boolean equals(Object object) {
if (object instanceof Task) {
return object.hashCode() == hashCode() ? true : false;
}
return false;
}
}
核心实现
public long getDelay(TimeUnit unit);
/**
* 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
*
* 这个方法会在CDelayQueue中被调用,通过对该方法的返回值的判断,来决定一个定时任务是否到时
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed other);
/**
* 实现此方法的目的是,该任务最终会被装载入CDelayQueue里的优先队列(PriorityQueue)中,而优先队列
* 之所以叫优先队列,就是通过此方法来比对同等条件下哪些任务被优先弹出。
* @param other
* @return
*/
@Override
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof Task) {
Task x = (Task) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (n < x.n)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
3.2.实现一个线程工具类
/**
* <p>
* [任务调度系统]
* <br>
* [后台守护线程不断的执行检测工作]
* </p>
*/
public class TaskQueueDaemonThread {
private TaskQueueDaemonThread() {
}
private static class LazyHolder {
private static TaskQueueDaemonThread taskQueueDaemonThread = new TaskQueueDaemonThread();
}
public static TaskQueueDaemonThread getInstance() {
return LazyHolder.taskQueueDaemonThread;
}
protected Executor executor = Executors.newFixedThreadPool(20);
/**
* 守护线程
*/
private Thread daemonThread;
/**
* 初始化守护线程
*/
public void init() {
daemonThread = new Thread(() -> execute());
daemonThread.setDaemon(true);
daemonThread.setName("Task Queue Daemon Thread");
daemonThread.start();
}
private void execute() {
System.out.println("start:" + System.currentTimeMillis());
while (true) {
try {
//从延迟队列中取值,如果没有对象过期则队列一直等待,
Task t1 = t.take();
if (t1 != null) {
//修改问题的状态
Runnable task = t1.getTask();
if (task == null) {
continue;
}
executor.execute(task);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
/**
* 创建一个最初为空的新 DelayQueue
*/
private CDelayQueue<Task> t = new CDelayQueue<>();
/**
* 添加任务,
* time 延迟时间
* task 任务
* 用户为问题设置延迟时间
*/
public void put(long time, Runnable task) {
//转换成ns
long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
//创建一个任务
Task k = new Task(nanoTime, task);
//将任务放在延迟的队列中
try {
t.put(k);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start(){
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("started!!");
}
});
}
/**
* 结束订单
* @param task
*/
public boolean endTask(Task<Runnable> task){
return t.remove(task);
}
}
3.3.Demo 类
public class Demo {
public static void main(String[] args) {
final TaskQueueDaemonThread instance = TaskQueueDaemonThread.getInstance();
instance.init();
instance.put(3000, new Runnable() {
@Override
public void run() {
System.out.println("三秒后执行这一行");
}
});
instance.put(6000, new Runnable() {
@Override
public void run() {
System.out.println("六秒后执行这一行");
}
});
instance.put(9000, new Runnable() {
@Override
public void run() {
System.out.println("九秒后执行这一行");
}
});
instance.put(20000, new Runnable() {
@Override
public void run() {
System.out.println("20秒后执行这一行");
}
});
instance.start();
}
}
运行效果:
awaiting started!! 激活线程 三秒后执行这一行 激活线程 六秒后执行这一行 激活线程 九秒后执行这一行 awaiting 20秒后执行这一行
总结
我们通过扩展AbstractQueue,然后实现BlockingQueue,使用优先队列来承载任务,通过使用Leader-Follower模式来实现线程之间的切换。延时队列的核心就是一个阻塞的队列+Leader-Follower模式+Delayed接口。
完整代码请点击“阅读原文”查看!
本文分享自 ImportSource 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!