前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >定时器算法

定时器算法

作者头像
leobhao
发布2023-03-11 17:11:23
4930
发布2023-03-11 17:11:23
举报
文章被收录于专栏:涓流涓流涓流

概述

在日常开发中, 定时任务是一个比较关键的功能。 Java 中一般使用 JDK 中 TimerScheduledExecutorService 和调度框架 Quartz等。 通常用于实现延时任务, 周期性任务等, 一般会有两种需求:

  1. 轮询定时任务:给定周期内扫描所有记录,查看是否有满足要求的数据。
  2. 延时消息:如常见的订单业务, 订单创建的时候发送一条 N 分钟到期的信息,一旦消息消费后便可判断订单是否可以取消

轮询的方式在数据量大的时候性能会比较差, 通常我们会选择第二种方式。

Timer

Timer 调度任务有一次性调度和循环调度,循环调度有分为:

固定速率调度(fixRate)

固定时延调度(fixDelay):

Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
    @Override
    public void run() {
        System.out.println(LocalDateTime.now());
    }
};
// 延迟 1s 打印(只打印一次)
timer.schedule(timerTask, 1000);
// 周期性每隔 1s 打印一次
timer.schedule(timerTask, 1000, 1000);
内部原理

Timer 类里包含一个任务队列和一个异步轮训线程。

  1. 任务队列是一个以下次执行时间比较的最小堆
  2. 任务队列里容纳了所有待执行的任务,所有的任务将会在这一个异步线程里执行,任务执行中代码不能抛出异常,否则会导致 Timer 线程挂掉,所有的任务都无法执行了。
  3. 单个任务也不易执行时间太长,否则会影响任务调度在时间上的精准性。比如你一个任务跑了太久,其它等着调度的任务就一直处于饥饿状态得不到调度。所有任务的执行都是这单一的 TimerThread 线程。

Timer 类:

class Timer {
  // 任务队列, 任务通过 Timer.schedule 方法将任务加入 TaskQueue
  // taskQueue 是数组实现的一个最小堆
  TaskQueue queue = new TaskQueue();
  // 执行任务的 timer 线程
  TimerThread thread = new TimerThread(queue);
}

class TaskQueue {
  TimerTask[] queue = new TimerTask[128];
  int size;
}

任务调度过程:

// 调度任务
private void sched(TimerTask task, long time, long period) {
  synchronized(task.lock) {
        if (task.state != TimerTask.VIRGIN)
                 throw new IllegalStateException(
                    "Task already scheduled or cancelled");
         task.nextExecutionTime = time;
         task.period = period;
         task.state = TimerTask.SCHEDULED;
    }
        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
}


// new Timer 的时候 内部的 TimerThread 已经开始 run 调用 mainLoop 了
// 运行任务
private void mainLoop() {
   while (true) {
        try {
            TimerTask task;
            boolean taskFired;
            synchronized(queue) {
                // 队列是空并且还有其他任务, 则 wait
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                // 如果队列没有任务, 直接结束
                if (queue.isEmpty())
                    break;
                // taskQueue 按照 nextExecutionTime 进行堆排序, 取出最小的(应该执行的)任务
                task = queue.getMin();
                synchronized(task.lock) {
                    if (task.state == TimerTask.CANCELLED) {
                        queue.removeMin();
                        continue;
                    }
                    currentTime = System.currentTimeMillis();
                    executionTime = task.nextExecutionTime;
                    if (taskFired = (executionTime<=currentTime)) {
                        // 如果没有设置执行间隔, 就移除该任务
                        if (task.period == 0) {
                            queue.removeMin();
                            task.state = TimerTask.EXECUTED;
                        } else {
                            // 否则加入队列等待下一次执行(这里是修改下一次任务的执行时间)
                            // 计算下一次调度时间
                            queue.rescheduleMin(
                              task.period<0 ? currentTime   - task.period
                                            : executionTime + task.period);
                        }
                    }
                }
                if (!taskFired) // Task hasn't yet fired; wait
                    queue.wait(executionTime - currentTime);
            }
            if (taskFired)  // Task fired; run it, holding no locks
                task.run();
        } catch(InterruptedException e) {
   }
 }

注意:

  1. Timer 只能单线程调度
  2. TimerTask 中出现的异常会影响到 Timer 的执行

ScheduledThreadPoolExecutor

  • schedule 提交一个一次性触发的任务, 在给定延时后执行
  • ScheduleAtFixedRate 是基于固定时间间隔进行任务调度
  • ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔的任务调度

使用:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);

//提交一个一次性触发的Runnable任务,在给定时延后执行。
public ScheduledFuture<?> schedule(
    Runnable command, 
    long delay, 
    TimeUnit unit
);

//提交一个定期重复的Runnable任务,在initialDelay时间后第一次执行,此后每隔period时间执行一次。如果某次执行抛出异常,之后所有任务取消执行。
// 如果一次执行时间过长,完成时已经超过下次执行开始时间,下一次执行会等待上一次执行结束后马上执行
public ScheduledFuture<?> scheduleAtFixedRate(
    Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit
);


// 跟 scheduleAtFixRate 类似
// 区别是在上一次执行结束之后,再等待delay时间,然后再执行下一次任务。
public ScheduledFuture<?> scheduleWithFixedDelay(
    Runnable command,
    long initialDelay,
    long delay,
    TimeUnit unit
);
ScheduledThreadPoolExecutor 执行原理

无论是 scheduleAtFixedRate 还是 scheduleWithFixedDelay 都会把任务包装成 ScheduledFutureTask, 然后调用delayedExecute(RunnableScheduledFuture<?> task 处理, 这里以scheduleAtFixedRate为例:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
     // 包装成 ScheduledFutureTask
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));                               
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 处理任务
    delayedExecute(t);
    return t;
}

delayedExecute 逻辑如下:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        //如果线程池已经关闭,拒绝该任务
       reject(task);
    else {
        //将任务加入队列并再次检查线程池状态,不可运行则取消任务
        super.getQueue().add(task);
        //如果任务刚入队后线程池关闭了且不允许执行任务,
        //从任务队列移除该任务并取消之
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 保证核心线程运行任务
            ensurePrestart();
    }
}
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        //如果worker数量小于corePoolSize,新建Worker
        //但是不设置firstTask而是让Worker从延迟队列获取任务
        addWorker(null, true);
    else if (wc == 0)
        //保证至少有一个Worker在运行
        addWorker(null, false);
}

可以看到以上代码并没有包含任何控制或响应延时的代码,因此这些逻辑应该是由延迟队列本身来控制的,这样就可以直接使用继承自ThreadPoolExecutor的方法完成其他相同的部分, 构造函数显示队列类型是DelayedWorkQueue

那我们回到加入队列的任务 ScheduledFutureTask 的 run 方法:

public void run() {
    //根据periodic是否为0判断是否是周期性任务
    boolean periodic = isPeriodic();
    //判断为非可执行任务状态时,取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //不是周期性任务,直接执行    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //是周期性任务runAndReset方法会执行在执行结束时将任务的状态重置为NEW,便于下次再次执行    
    else if (ScheduledFutureTask.super.runAndReset()) {
        //设置下周执行的时间
        setNextRunTime();
        //再次执行周期行任务
        reExecutePeriodic(outerTask);
    }
}

private void setNextRunTime() {
    long p = period;
    // 这里就是 fixRate 和 fixDelay 的区别, fixRate preiod > 0, 以固定频率执行(上次任务时间+执行周期)
    if (p > 0)
        time += p;
    // 固定延迟, 则是当前系统时间+执行周期
    else
        time = triggerTime(-p);
}

long triggerTime(long delay) {
    // 返回当前系统时间加执行周期
	return now() +
	    // 这里是为了防止溢出
		((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}


void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        //将任务重新入队
        super.getQueue().add(task);
        //如果线程池已关闭,将任务取消
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();//再次执行任务
    }
}

TimeWheel 时间轮算法

TimeWheel时间轮算法,是一种实现延迟队列的巧妙且高效的算法,被应用在Netty,Zookeeper,Kafka等各种框架中, 应用场景广泛。

比如在Dubbo中,为增强系统的容错能力,会有相应的监听判断处理机制。比如RPC调用的超时机制的实现,消费者判断RPC调用是否超时,如果超时会将超时结果返回给应用层。

在Dubbo最开始的实现中,是将所有的返回结果(DefaultFuture)都放入一个集合中,并且通过一个定时任务,每隔一定时间间隔就扫描所有的future,逐个判断是否超时。

这样的实现方式虽然比较简单,但是存在一个问题就是会有很多无意义的遍历操作开销。比如一个RPC调用的超时时间是10秒,而设置的超时判定的定时任务是2秒执行一次,那么可能会有4次左右无意义的循环检测判断操作。

对于以上问题, 目的就是要减少额外扫描的次数,这样能减少 CPU 的开销, 时间轮可以很好的解决这个问题

时间轮介绍
单时间轮

单时间轮只有一个由bucket串起来的轮子,每个bucket下链接着未来对应时刻到期的节点。

假设相邻bucket到期时间的间隔为slot=1s,从当前时刻0s开始计时,1s时到期的定时器节点挂在bucket[1]下,2s时到期的定时器节点挂在bucket[2]下……

当tick检查到时间过去了1s时,bucket[1]下所有节点执行超时动作,当时间到了2s时,bucket[2]下所有节点执行超时动作……

上图只有 8 个 bucket, 如果按照 slot=expire 来算, 只能挂 8s 的定时任务, 超过 8s 可以使用 slot = expire % N, 这里需要引入 rotation 的概念,定时器中expire表示到期时间,rotation表示节点在时间轮转了几圈后才到期

多时间轮

)

Linux的多时间轮算法,借鉴了日常生活中水表的度量方法,通过低刻度走得快的轮子带动高一级刻度轮子走动的方法,达到了仅使用较少刻度即可表示很大范围度量值的效果

netty HashedWheelTimer 源码分析

HashedWheelTimer 是接口 io.netty.util.Timer 的实现:

public interface Timer {

    // 创建一个定时任务
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    // 停止所有的还没有被执行的定时任务
    Set<Timeout> stop();
}

Timeout 是一个接口类, TimerTask 非常简单,就一个 run() 方法:

public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}

public interface Timeout {
    Timer timer();
    TimerTask task();
    boolean isExpired();
    boolean isCancelled();
    boolean cancel();
}

构造函数太多,摘要一下几个重要的参数:

  • tickDuration 和 timeUnit 定义了一格的时间长度,默认的就是 100ms。
  • ticksPerWheel 定义了一圈有多少格,默认的就是 512;
  • leakDetection:用于追踪内存泄漏
  • maxPendingTimeouts:最大允许等待的 Timeout 实例数,也就是我们可以设置不允许太多的任务等待。如果未执行任务数达到阈值,那么再次提交任务会抛出 RejectedExecutionException 异常。默认不限制。

HashedWheelTimer 提交任务:

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    // 校验等待任务数是否达到阈值 maxPendingTimeouts
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }

    // 如果工作线程没有启动,这里负责启动(如果是第一个任务, 还会 await 在这里)
    start();

    /** 下面的代码,构建 Timeout 实例,将其放到任务队列中。 **/

    // deadline 是一个相对时间,相对于 HashedWheelTimer 的启动时间
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // 防止 deadline 溢出
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // timeout 实例,一个上层依赖 timer,一个下层依赖 task,另一个是任务到期时间
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // 放到 timeouts 队列中
    timeouts.add(timeout);
    return timeout;
}

这里的操作很简单:实例化 Timeout,然后放到任务队列中。需要注意的是,任务队列是 MPSC(Multiple Producer Single Consumer)队列, 刚好适用于这里的多生产线程,单消费线程的场景(这个队列是 JCTools 提供的一个并发数据结构)

Worker 线程工作原理:

private final class Worker implements Runnable {

    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    // tick 过的次数,时针每 100ms tick 一次
    private long tick;

    @Override
    public void run() {
        // 在 HashedWheelTimer 中,用的都是相对时间,所以需要启动时间作为基准,并且要用 volatile 修饰
        // 这里跟 newTimeout 中 start 的部分结合理解一下, start 希望工作线程真正启动之后, 并赋值完 startTime 才返回, 因为后面的逻辑需要一个正确的 startTime
        // 所以在 start 方法中使用  while(startTime ==0 )  startTimeInitialized.awit 这样做保证
        startTime = System.nanoTime();
        if (startTime == 0) {
            startTime = 1;
        }

        // 唤醒工作线程启动的时候提交的任务(在提交任务的start里面 await 的)
        startTimeInitialized.countDown();

        // 执行逻辑的地方
        do {
            // 等待并获取下个 tick 相对于开始时间的纳秒数
            final long deadline = waitForNextTick();

            if (deadline > 0) {
                // 该次 tick,bucket 数组对应的 index
                int idx = (int) (tick & mask);

                // 处理一下已经取消的任务
                processCancelledTasks();

                // 取得当前 bucket
                HashedWheelBucket bucket = wheel[idx];

                // 将队列中所有的任务转移到相应的 buckets 中
                transferTimeoutsToBuckets();

                // 执行进入到这个 bucket 中的任务
                bucket.expireTimeouts(deadline);

                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        /* 到这里,说明这个 timer 要关闭了,做一些清理工作 */

        // 将所有 bucket 中没有执行的任务,添加到 unprocessedTimeouts 这个 HashSet 中,
        // 主要目的是用于 stop() 方法返回
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        // 将任务队列中的任务也添加到 unprocessedTimeouts 中
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }

在细看一下 run 流程中几个重要的方法, waitForNextTick 在每个 tick 到期返回:


/**
 * 前面说过,这里用的都是相对时间,所以:
 *   第一次进来的时候,工作线程会在 100ms 的时候返回,返回值是 100*10^6 (纳秒)
 *   第二次进来的时候,工作线程会在 200ms 的时候返回,依次类推
 * 另外就是注意极端情况,比如第二次进来的时候,由于被前面的任务阻塞,导致进来的时候就已经是 250ms,
 *   那么,一进入这个方法就要立即返回,返回值是 250ms,而不是 200ms
 */
private long waitForNextTick() {
    // 下一个 tick 的毫秒数
    long deadline = tickDuration * (tick + 1);

    // 嵌套在一个死循环里面
    for (;;) {
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

        if (sleepTimeMs <= 0) {
        // 这里可能是 HashedWheelTimer 启动到现在经过了太久,  currentTime 已经溢出了
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                // 这里是出口,所以返回值是当前时间(相对时间)
                return currentTime;
            }
        }

        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (PlatformDependent.isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }

        try {
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            // 如果 timer 已经 shutdown,那么返回 Long.MIN_VALUE
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

接着看看转移队列到 bucket 中的方法 transferTimeoutsToBuckets:

private void transferTimeoutsToBuckets() {
    // 这里一个 for 循环,还特地限制了 10 万次, 怕一直往队列里面仍任务, netty 的源码还是很细节的
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // 没有任务了
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // 该任务刚刚被取消了(在transfer之前其实已经做过一次了)
            continue;
        }

        
        // 计算还需要等待几个 tick
        long calculated = timeout.deadline / tickDuration;
        // 计算出轮次
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        // 单个 bucket,是由 HashedWheelTimeout 实例组成的一个链表,
        // 放入 bucket 链表
        bucket.addTimeout(timeout);
    }
}

bucket 中任务的执行:

/**
 * 这里会执行这个 bucket 中,轮次为 0 的任务,也就是到期的任务。
 * 这个方法的入参 deadline 其实没什么用,因为轮次为 0 的都是应该被执行的。
 */
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // 处理链表上的所有 timeout 实例
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // 这行代码负责执行具体的任务
                timeout.expire();
            } else {
                // 这里的代码注释也说,不可能进入到这个分支
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            // 轮次减 1
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

总的来说 HashedWheelTimer 工作原理如下:

参考资料

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021年6月22日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • Timer
    • 内部原理
    • ScheduledThreadPoolExecutor
      • ScheduledThreadPoolExecutor 执行原理
      • TimeWheel 时间轮算法
        • 时间轮介绍
          • 单时间轮
          • 多时间轮
        • netty HashedWheelTimer 源码分析
        • 参考资料
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档