前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty时间轮延时任务

Netty时间轮延时任务

原创
作者头像
时光_赌徒
发布2022-07-25 16:28:57
9910
发布2022-07-25 16:28:57
举报
文章被收录于专栏:记录记录
  • HashedWheelTimer概念

这个类用来计划执行非精准的I/O超时。可以通过指定每一格的时间间隔来改变执行时间的精确度。在大多数网络应用中,I/O超时不需要十分准确,因此,默认的时间间隔是100 毫秒,这个值适用于大多数场合。HashedWheelTimer内部结构可以看做是个车轮,简单来说,就是TimerTask的hashTable的车轮。车轮的size默认是512,可以通过构造函数自己设置这个值。注意,当HashedWheelTimer被实例化启动后,会创建一个新的线程,因此,你的项目里应该只创建它的唯一一个实例。

  • 参数解释

首先创建时间轮,因为项目中只能出现一个实例所以直接用final修饰;

代码语言:javascript
复制
public  final HashedWheelTimer timer_wheel = new HashedWheelTimer(1L, TimeUnit.SECONDS, 60);

参数解释:

代码语言:javascript
复制
/**long tickDuration:滴答时间,刻度之间的持续时间;
    TimeUnit unit: 滴答时间单位
    int ticksPerWheel:  多少个刻度一圈
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}

理解起来就像钟表,tickDuration 1s滴答一下,ticksPerWheel 刻度盘为60 ;

连起来就是创建一个时间论,一秒滴答一次,刻度盘为60,也就是60S 后重新开始

代码语言:javascript
复制
/**
TimerTask task:延时执行的任务,需要实现接口TimerTask
long delay:延时时间的时间
TimeUnit unit:延时的单位
*/
timer_wheel.newTimeout(TimerTask task, long delay, TimeUnit unit);

代码语言:java
复制

//设置10S后执行myTimerTask
Object object = new Object;
MyDelayTask myDelayTask = new MyDelayTask(object);
timer_wheel.newTimeout(myTimerTask, 10, TimeUnit.SECONDS);
代码语言:javascript
复制
//实现TimerTask
 class MyDelayTask implements TimerTask {
    private Object object;

    public MyDelayTask(Object object) {
        this.object = object;
    }

    @Override
    public void run(Timeout timeout) {
    //使用异步线程
        CompletableFuture.runAsync(() -> {
            //Do SomeThing
        }, executorService);

    }

}

附带上线程池的类

代码语言:javascript
复制
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.log.Log;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.util.concurrent.*;


@Import({SpringUtil.class})
@Configuration
@Slf4j
public class ThreadPoolConfig {
   

    @Bean(value = "executorService")
    public ThreadPoolExecutor threadPoolExecutor() {
        int nThreads = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new TraceThreadPoolExecutor(
                nThreads,
                nThreads * 2 + 1,
                30L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue(100000),
                Executors.defaultThreadFactory(),
                new BusinessAbortPolicy()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                log.info("Thread ready to execute:{}", t.getName());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                if (t == null) {
                    log.info("Thread execution complete:{}", new Thread(r).getName());
                } else {
                    log.error("Thread execution exception{}--->{}", new Thread(r).getName(), t.getMessage());
                }
            }

            @Override
            protected void terminated() {
                log.info("Thread pool exit");
            }
        };
        return executor;
    }
}

class BusinessAbortPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        Log log = Log.get();
        String message = "任务 " + r.toString() + " 被 " + executor.toString() + "拒绝!!";
        log.error("The_thread_pool_is_full and cannot continue processing tasks>>>>{}", message);
    }
}

class TraceThreadPoolExecutor extends ThreadPoolExecutor {

    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, clientTrace(), Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("============Client_thread_exception=================");
    }

    private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();
                } catch (Exception ex) {
                    clientStack.printStackTrace();
                    throw ex;
                }
            }
        };
    }

}

当然HashedWheelTimer这个类属于全内存任务计算,通常在我们真正的业务中,是不会把这些任务直接放到jvm内存中的,要不然重启之后任务不都会消失了么,这样我们需要重写HashedWheelTimer,只需要对它任务的添加和获取进行重写到相应的持久化中间件中即可;

代码语言:javascript
复制
@Slf4j
@Component
public class DelayTimeTask {
    @Autowired
    private RaceMatchPeopleService raceMatchPeopleService;

    @Autowired
    private ExecutorService executorService;

    public void initTask(MatchTeamRedis matchTeamRedis, int timeWheelTime) {
        MatchDelayTask matchTimerTask = new MatchDelayTask(matchTeamRedis);
        MatchConfig.TIMER_WHEEL.newTimeout(matchTimerTask, timeWheelTime, TimeUnit.SECONDS);
    }

    private class MatchDelayTask implements TimerTask {
        private MatchTeamRedis matchTeamRedis;

        public MatchDelayTask(MatchTeamRedis matchTeamRedis) {
            this.matchTeamRedis = matchTeamRedis;
        }

        @Override
        public void run(Timeout timeout) {
            CompletableFuture.runAsync(() -> {
                raceMatchPeopleService.matchPeople(this.matchTeamRedis, Boolean.TRUE);
            }, executorService);

        }

    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档