前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自己动手系列-延迟队列

自己动手系列-延迟队列

作者头像
ImportSource
发布2018-04-03 11:55:18
2.8K0
发布2018-04-03 11:55:18
举报
文章被收录于专栏:ImportSourceImportSource

1.什么是延迟队列

在java的并发包中有有关定时调度的api。 里边其中一个重要实现就是延迟队列,通过延时队列来实现定时调度。

那么如果让你实现一个延时队列,你会怎么做呢?

2.自己实现一个延迟队列

2.1.定义一个Delayed接口。

2.2.定义一个DelayQueue。

2.2.1.继承AbstractQueue

2.2.2.实现BlockingQueue

2.2.2.使用PriorityQueue来装载任务

2.2.3.使用重入锁ReentrantLock来存取操作的线程安全

2.2.4.创建condition,用来唤醒和挂起线程

2.2.5.核心方法take的实现

2.2.5.1.通过getDelay来判断是否到期

2.2.5.2.Leader-Follower模式的使用

2.1.定义一个Delayed接口

新建一个Delayed接口。这个接口主要是为了限制让自定义的task都去实现这个接口。因为我们要通过这个方法来获取

还有多久过期。

代码语言:javascript
复制
/**
 * 延时接口
 * @author hezhuofan
 */
public interface Delayed extends Comparable<Delayed> {
    /**
     * 返回剩余的延迟时间
     * @param unit 时间单位
     * @return 剩余的时间; 零值或负值表示早已过期
     */
    long getDelay(TimeUnit unit);
}

2.2.定义一个DelayQueue。

2.2.1.继承AbstractQueue

代码语言:javascript
复制
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E>  {
代码语言:javascript
复制
}
代码语言:javascript
复制

AbstractQueue

这个类提供了一些Queue操作的骨架实现。 当你需要实现一个元素不能为null的队列时,就可以继承这个类。 方法add,remove和element都是基于offer,poll和peek,但是会抛出异常而不是通过返回false或null来表示失败。

扩展此类的Queue实现,必须最小化的去定义Queue的offer方法,该方法不允许插入null元素。另外这几个方法Queue.peek,Queue.poll,Collection.size和Collection.iterator也是一样的。 通常情况下,其他方法也将被覆盖。 如果你要实现的队列不能满足这些限制,那么请考虑更上一层的AbstractCollection来实现你的队列。

该类是Java Collections Framework的成员。

另外值得注意的是,这里我们让动态类型必须是实现了Delayed接口的,因为只有实现了Delayed接口的任务才能获取到定时的剩余时间,才能判断什么时候把到期的任务弹出。

2.2.2.实现BlockingQueue

为了让我们的队列是一个阻塞队列,我们需要实现接口BlockingQueue。

代码语言:javascript
复制

代码语言:javascript
复制
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
代码语言:javascript
复制
}

BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

1. 当队列满了的时候进行入队列操作

2. 当队列空了的时候进行出队列操作

因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另一个线程进行了入队列操作。

在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。

阻塞队列主要用在生产者/消费者的场景,下面这幅图展示了一个线程生产、一个线程消费的场景:

2.2.2.使用PriorityQueue来装载任务

代码语言:javascript
复制
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
代码语言:javascript
复制
private PriorityQueue<E> q=new PriorityQueue<E>();
代码语言:javascript
复制
}

我们知道队列是遵循先进先出(First-In-First-Out)模式的,但有些时候需要在队列中基于优先级处理对象。举个例子,比方说我们有一个每日交易时段生成股票报告的应用程序,需要处理大量数据并且花费很多处理时间。客户向这个应用程序发送请求时,实际上就进入了队列。我们需要首先处理优先客户再处理普通用户。在这种情况下,Java的PriorityQueue(优先队列)会很有帮助。

PriorityQueue类在Java1.5中引入并作为 Java Collections Framework 的一部分。PriorityQueue是基于优先堆的一个无界队列,这个优先队列中的元素可以默认自然排序或者通过提供的Comparator(比较器)在队列实例化的时排序。

2.2.3.使用重入锁ReentrantLock来保证存取操作的线程安全

代码语言:javascript
复制
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    private final transient ReentrantLock lock=new ReentrantLock();

    private PriorityQueue<E> q=new PriorityQueue<E>();
代码语言:javascript
复制
}

重入锁不多说了,这是一个用的最广泛的锁之一了。

2.2.4.创建condition,用来唤醒和挂起线程

代码语言:javascript
复制
public class CDelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    private final transient ReentrantLock lock=new ReentrantLock();

    private PriorityQueue<E> q=new PriorityQueue<E>();

    /**
     *用于线程的等待以及唤醒的触发,稍后会详细介绍Leader-Follower模式
    */
    private Thread leader = null;

    private final Condition available = lock.newCondition();
代码语言:javascript
复制
}
代码语言:javascript
复制

这里我们使用Condition来辅助一些后面将要介绍到的Leader-Follower模式。

Condition的基本动作就是负责唤醒和挂起线程。

2.2.5.核心方法take的实现

队列的put offer等方法都比较简单,就是赤裸裸的往队列放入任务。延时队列最核心的实现只有一个方法就是take方法,接下来就来看看吧,请边看代码,边看注释:

@Override public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //每个方法都持有一个lock lock.lockInterruptibly();//上锁,如果当前线程被中断,那么抛出异常无法获取锁 try { for (;;) {//循环检测 E first = q.peek(); //去获取优先队列中index为0的对象(在优先队列中是通过数组实现) if (first == null) {//如果没有拿到,那么就让该线程等待 System.out.println("awaiting"); available.await();//该线程一直等待,直到被signal唤醒或interrupted }else {//如果第一个元素拿到了 /** * 那么就去获取到第一个元素所设置的延迟时间。 * 我们队列中的每个元素都是实现了Delayed接口的,所以是可以拿到元素自定义的getDelay的延迟时间的 */ long delay = first.getDelay(NANOSECONDS); /** * 如果延迟为小于等于0,那么就意味着这个任务到点了,要被执行了,于是就弹出该任务(元素)。 */ if (delay <= 0) //弹出该元素,然后返回(得到,然后从队列中删除掉),本次take执行完成。 return q.poll(); /** * 如果delay大于0,说明该任务还没到时间点 */ first = null; //那么就把这个first给置为null /** * 以下是有关leader的逻辑,以下的逻辑借鉴了Leader-Follower模式。 * * 基本逻辑是: * * 1、如果已经有leader了,那么就让当前线程挂起。 * 2、如果还没有leader,那么就把当前线程选为leader。 * 3、然后让当前线程等待delay等长时间。 */ if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread();//当前线程 leader = thisThread; //把当前线程设置为leader try { available.awaitNanos(delay);//等待指定delay时间 } finally { if (leader == thisThread)//如果当前线程就是leader leader = null;//那么就把leader置为null } } } } } finally { if (leader == null && q.peek() != null){//如果现在没有leader,并且队列中已经有任务了 System.out.println("激活线程"); available.signal();}//那么就唤醒睡眠的线程 lock.unlock(); } }

2.2.5.1.通过getDelay来判断是否到期

代码语言:javascript
复制
 /**
  * 那么就去获取到第一个元素所设置的延迟时间。
  * 我们队列中的每个元素都是实现了Delayed接口的,所以是可以拿到元素自定义的getDelay的延迟时间的
  */
  long delay = first.getDelay(NANOSECONDS);
  /**
  * 如果延迟为小于等于0,那么就意味着这个任务到点了,要被执行了,于是就弹出该任务(元素)。
  */
  if (delay <= 0)
  //弹出该元素,然后返回(得到,然后从队列中删除掉),本次take执行完成。
  return q.poll();
代码语言:javascript
复制

2.2.5.2.Leader-Follower模式的使用

上面的 take方法中就借鉴了Leader-Follower模式,通过该模式来实现线程之间的切换。以下是有关该模式的一些介绍。

Leader/Follower相比较于普通的ThreadPool的优点:

1、无需context switch,减少了线程间数据copy。

2、无需维护一个队列,占用而外的内存空间。

代码语言:javascript
复制
代码语言:javascript
复制

它们通过以下两个方法来实现切换:

waitToLeader

promoteNewLeader

利用了java线程间通信的特性, 实现leader/follower线程的切换。

目前java实现线程间通信,有两种方法:

1、Object.wait, Object.notify

2、Condition.await, Condition.signal

正如你可能已经读过的,该模式由4个组件组成:ThreadPool,HandleSet,Handle,ConcreteEventHandler(实现EventHandler接口)。我们的延时队列的take操作正是借鉴了这种模式。怎么理解Leader-Follower呢?

你可以把它想象成一个夜晚的出租车站,所有的司机都在睡觉,除了一个领导。 ThreadPool是一个管理多个线程的工作站 - 出租车(线程)。

领导者正在等待HandleSet上的IO事件,就像司机等待客户一样。

当一个客户到达(以识别IO事件的句柄的形式)时,领导司机唤醒另一个司机成为下一个领导,然后就去服务他的乘客的请求了,也就是去送那个客户到指定的地方。

当他把客户送到给定的地址(呼叫ConcreteEventHandler并交给Handle)时,下一个领导可以同时为另一个乘客服务。

当司机结束时,他把计程车开回车站,如果车站不空,就睡着了。否则他会成为领导者。

每个线程就好比出租车司机,threadpool好比出租车总站,event好比出租车客户(thead - cab driver, threadpool - station, event - client)。

3.演示

3.1.实现一个基于Delayed的任务

代码语言:javascript
复制
/**
 * <p>
 * [任务调度系统]
 * <br>
 * [队列中要执行的任务]
 * </p>
 */
public class Task<T extends Runnable> implements Delayed {//实现Delayed接口
    /**
     * 到期时间
     */
    private final long time;

    /**
     * 问题对象
     */
    private final T task;
    private static final AtomicLong atomic = new AtomicLong(0);

    private final long n;

    public Task(long timeout, T t) {
        this.time = System.nanoTime() + timeout;
        this.task = t;
        this.n = atomic.getAndIncrement();
    }

    /**
     * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
     *
     * 这个方法会在CDelayQueue中被调用,通过对该方法的返回值的判断,来决定一个定时任务是否到时
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    /**
     * 实现此方法的目的是,该任务最终会被装载入CDelayQueue里的优先队列(PriorityQueue)中,而优先队列
     * 之所以叫优先队列,就是通过此方法来比对同等条件下哪些任务被优先弹出。
     * @param other
     * @return
     */
    @Override
    public int compareTo(Delayed other) {
        if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof Task) {
            Task x = (Task) other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (n < x.n)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    public T getTask() {
        return this.task;
    }

    @Override
    public int hashCode() {
        return task.hashCode();
    }

    @Override
    public boolean equals(Object object) {
        if (object instanceof Task) {
            return object.hashCode() == hashCode() ? true : false;
        }
        return false;
    }


}

核心实现

public long getDelay(TimeUnit unit);

代码语言:javascript
复制
/**
     * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
     *
     * 这个方法会在CDelayQueue中被调用,通过对该方法的返回值的判断,来决定一个定时任务是否到时
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

public int compareTo(Delayed other);

代码语言:javascript
复制
/**
     * 实现此方法的目的是,该任务最终会被装载入CDelayQueue里的优先队列(PriorityQueue)中,而优先队列
     * 之所以叫优先队列,就是通过此方法来比对同等条件下哪些任务被优先弹出。
     * @param other
     * @return
     */
    @Override
    public int compareTo(Delayed other) {
        if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof Task) {
            Task x = (Task) other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (n < x.n)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

3.2.实现一个线程工具类

代码语言:javascript
复制
/**
 * <p>
 * [任务调度系统]
 * <br>
 * [后台守护线程不断的执行检测工作]
 * </p>
 */
public class TaskQueueDaemonThread {

    private TaskQueueDaemonThread() {
    }

    private static class LazyHolder {
        private static TaskQueueDaemonThread taskQueueDaemonThread = new TaskQueueDaemonThread();
    }

    public static TaskQueueDaemonThread getInstance() {
        return LazyHolder.taskQueueDaemonThread;
    }

    protected Executor executor = Executors.newFixedThreadPool(20);
    /**
     * 守护线程
     */
    private Thread daemonThread;

    /**
     * 初始化守护线程
     */
    public void init() {
        daemonThread = new Thread(() -> execute());
        daemonThread.setDaemon(true);
        daemonThread.setName("Task Queue Daemon Thread");
        daemonThread.start();
    }

    private void execute() {
        System.out.println("start:" + System.currentTimeMillis());
        while (true) {
            try {
                //从延迟队列中取值,如果没有对象过期则队列一直等待,
                Task t1 = t.take();
                if (t1 != null) {
                    //修改问题的状态
                    Runnable task = t1.getTask();
                    if (task == null) {
                        continue;
                    }
                    executor.execute(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
        }
    }

    /**
     * 创建一个最初为空的新 DelayQueue
     */
    private CDelayQueue<Task> t = new CDelayQueue<>();

    /**
     * 添加任务,
     * time 延迟时间
     * task 任务
     * 用户为问题设置延迟时间
     */
    public void put(long time, Runnable task) {
        //转换成ns
        long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
        //创建一个任务
        Task k = new Task(nanoTime, task);
        //将任务放在延迟的队列中
        try {
            t.put(k);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void start(){
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("started!!");
            }
        });
    }

    /**
     * 结束订单
     * @param task
     */
    public boolean endTask(Task<Runnable> task){
        return t.remove(task);
    }
}

3.3.Demo 类

代码语言:javascript
复制
public class Demo {
    public static void main(String[] args) {
        final TaskQueueDaemonThread instance = TaskQueueDaemonThread.getInstance();
        instance.init();
        instance.put(3000, new Runnable() {
            @Override
            public void run() {
                System.out.println("三秒后执行这一行");
            }
        });
        instance.put(6000, new Runnable() {
            @Override
            public void run() {
                System.out.println("六秒后执行这一行");
            }
        });
        instance.put(9000, new Runnable() {
            @Override
            public void run() {
                System.out.println("九秒后执行这一行");
            }
        });
        instance.put(20000, new Runnable() {
            @Override
            public void run() {
                System.out.println("20秒后执行这一行");
            }
        });
       instance.start();
    }
}

运行效果:

awaiting started!! 激活线程 三秒后执行这一行 激活线程 六秒后执行这一行 激活线程 九秒后执行这一行 awaiting 20秒后执行这一行

总结

我们通过扩展AbstractQueue,然后实现BlockingQueue,使用优先队列来承载任务,通过使用Leader-Follower模式来实现线程之间的切换。延时队列的核心就是一个阻塞的队列+Leader-Follower模式+Delayed接口。

完整代码请点击“阅读原文”查看!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ImportSource 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档