前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty中的时间轮(v3.10.7)

Netty中的时间轮(v3.10.7)

作者头像
书唐瑞
发布2022-06-02 14:04:19
3270
发布2022-06-02 14:04:19
举报
文章被收录于专栏:Netty历险记

在上一篇Netty中的时间轮(v3.2.5)中,讲解的版本是v3.2.5,它在MAVEN仓库中是可以找到的.这篇文章讲解的是3.x系列中目前最高的版本v3.10.7,它在MAVEN仓库中不存在,这个版本只在Netty源码中可以找到.讲解这个v3.10.7版本的目的是要和v3.2.5版本做个对比,看它们各自在时间轮上的实现差异.

在时间轮创建的底层,v3.10.7使用的是普通对象数组,而v3.2.5使用的是集合数组.

代码语言:javascript
复制
// v3.2.5版本中的时间轮底层存放任务的结构
final Set<HashedWheelTimeout>[] wheel;
// v3.10.7版本中的时间轮底层存放任务的结构
private final HashedWheelBucket[] wheel;      //Bucket表示桶的意思

在v3.2.5版本中,当提交一个任务的时候,任务是直接放入到时间轮上面去的.每个格子都指向一个HashMap,HashMap中存放着提交的任务.如下图

在v3.10.7版本中,当提交一个任务的时候,并不是立刻就放到时间轮上面,而是先放到一个队列中,之后每次执行任务的时候,从队列中最多取出100000个任务放到时间轮上.

接下来从源码的角度看下v3.10.7版本的实现.代码做了删减,只体现重点.

代码语言:javascript
复制
public HashedWheelTimer(
            ThreadFactory threadFactory,
            ThreadNameDeterminer determiner,
            long tickDuration, TimeUnit unit, int ticksPerWheel) {

  // 创建时间轮底层存储任务的数据结构
  wheel = createWheel(ticksPerWheel);
  mask = wheel.length - 1;

  // 每个格子的时间
  this.tickDuration = unit.toNanos(tickDuration);
  
  // 时间轮处理任务的线程
  workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
          worker, "Hashed wheel timer #" + id.incrementAndGet(),
          determiner));

}
// 时间轮
private final HashedWheelBucket[] wheel;
// 存放任务的队列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();

在构造方法中,会创建时间轮,它的底层就是一个对象数组.有一个timeouts的队列,用于存储外界提交的任务.

代码语言:javascript
复制
private static final class HashedWheelBucket {

    // 时间轮上每个格子都是通过链表的方式,将每个任务'链'起来
        private HashedWheelTimeout head;
        private HashedWheelTimeout tail;
    
    ...
}

外界提交任务的时候,代码如下

代码语言:javascript
复制
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

  long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
  HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  // 将任务放入到队列中,并没有一开始就放到时间轮上
  timeouts.add(timeout);
  return timeout;
}

时间轮执行任务,代码如下

代码语言:javascript
复制
public void run() {
  
  startTime = System.nanoTime();

  do {
    final long deadline = waitForNextTick();
    if (deadline > 0) {
      // 将队列中的任务最多取100000放到时间轮上
      transferTimeoutsToBuckets();
      HashedWheelBucket bucket = wheel[(int) (tick & mask)];
      // 执行时间轮上当前格子上的任务
      bucket.expireTimeouts(deadline);
      tick++;
    }
  } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

}

上一篇文章介绍过关于startTime,deadline的含义.这里再说一下

当时间轮启动的时候,虽然startTime = System.nanoTime(). 其实我们可以换个角度,在时间轮的世界里,startTime=0,因为世界才刚刚开始启动.

当一个任务要延时delay执行的时候,它在时间轮的世界里就已经被固定好位置了.随着时间轮的'转动',当时间'走'到相应的位置,就会执行符合条件的任务.

再看下任务提交时的关于deadline的概念

代码语言:javascript
复制
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

  // 每个任务的deadline都是相对时间,相对startTime而言的
  long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
  HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  timeouts.add(timeout);
  return timeout;
}

long deadline = System.nanoTime() + unit.toNanos(delay) - startTime 这行代码,它使用当前时间减去startTime时间,再加上延时时间delay. 表示的含义就是任务相对于起点(startTime那个时刻点)的位置.

关于时间轮,大家可以想象下卷尺.

时间轮在执行任务的时候,上面的代码有个waitForNextTick方法

代码语言:javascript
复制
private long waitForNextTick() {
  long deadline = tickDuration * (tick + 1);

  for (;;) {
    final long currentTime = System.nanoTime() - startTime;
    // 当当前时间等于deadline的时候,就会跳出循环
    long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

    if (sleepTimeMs <= 0) {
      if (currentTime == Long.MIN_VALUE) {
        return -Long.MAX_VALUE;
      } else {
        return currentTime;
      }
    }

    try {
      Thread.sleep(sleepTimeMs);
    } catch (InterruptedException e) {
    }
  }
}

用图形表示下上面这段代码的含义

只有当前时间currentTime等于deadline的时候,才会跳出循环.

跳出循环之后,接下来就会从任务队列中取出任务放到时间轮上.

代码语言:javascript
复制
private void transferTimeoutsToBuckets() {
  for (int i = 0; i < 100000; i++) {
      // 从队列中取出任务
    HashedWheelTimeout timeout = timeouts.poll();
    if (timeout == null) {
      break;
    }
    if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
        || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
      timeout.remove();
      continue;
    }
    long calculated = timeout.deadline / tickDuration;
    long remainingRounds = (calculated - tick) / wheel.length;
    timeout.remainingRounds = remainingRounds;

    final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
    int stopIndex = (int) (ticks & mask);
    
    // 将任务放到时间轮的当前格子中
    HashedWheelBucket bucket = wheel[stopIndex];
    bucket.addTimeout(timeout);
  }
}

最多取出100000个任务放到时间轮上.

接下来就是执行过期的任务

代码语言:javascript
复制
public void expireTimeouts(long deadline) {
  HashedWheelTimeout timeout = head;

  while (timeout != null) {
    boolean remove = false;
    if (timeout.remainingRounds <= 0) {
        // 如果时间已经到了,则执行任务
      if (timeout.deadline <= deadline) {
        timeout.expire();
      }
      remove = true;
    } else if (timeout.isCancelled()) {
      remove = true;
    } else {
      timeout.remainingRounds --;
    }
    HashedWheelTimeout next = timeout.next;
    if (remove) {
      remove(timeout);
    }
    timeout = next;
  }
}

会遍历每个格子中的任务,如果任务超期了,则执行它.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Netty历险记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档