前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【云原生】Nacos-TaskManager 任务管理的使用

【云原生】Nacos-TaskManager 任务管理的使用

作者头像
石臻臻的杂货铺[同名公众号]
发布2022-06-30 15:46:11
3650
发布2022-06-30 15:46:11
举报
文章被收录于专栏:kafka专栏kafka专栏

任务管理类

因为Nacos中有很多地方使用了这个TaskManager,所以我们得先了解一下这个类是干啥用的,方便后面阅读源码时候不会吃力;

先说结论:

TaskManager 可以看成是一个待执行的任务集合,用于处理一定要执行成功的任务 单线程的方式处理任务,保证任务一定被成功处理; 如果执行失败了,任务会被重新放入集合中等待下一次被消费;

AbstractTask

AbstractTask是个抽象类,所有的需要被执行的任务都继续这个类; 这个类主要提供执行任务所需要的数据和方法;例如

   /* 一个任务两次处理的间隔,单位是毫秒*/
    private long taskInterval;
    /*任务上次被处理的时间,用毫秒表示*/
    private long lastProcessTime;
/* TaskManager 判断当前是否需要处理这个Task,子类可以Override这个函数实现自己的逻辑
     */
    public boolean shouldProcess() {
        return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval);
    }

TaskProcessor任务处理器

TaskProcessor 是任务处理器接口,它有个方法

boolean process(String taskType, AbstractTask task);

用于执行对应的AbstractTask任务类; 不同的任务类型,可以实现自己的执行任务逻辑;

TaskManager任务管理类

TaskManager 是个任务管理类;

它里面有两个属性保存了待消费的任务AbstractTask,和任务执行需要的TaskProcessor;

/**待消费的任务AbstractTask**/
private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>();
/**任务AbstractTask对应的任务执行器TaskProcessor**/
 private final ConcurrentHashMap<String, TaskProcessor> taskProcessors =new ConcurrentHashMap<String, TaskProcessor>();

如果taskProcessors中没有找到对应的任务执行器,那么它里面有一个默认执行器会执行

 /**默认执行器**/
 private TaskProcessor defaultTaskProcessor;

使用用例

Nacos配置中心模块很重要一个功能就是,在初始化的时候以及每隔一段时间就会去数据库中把所有数据Dump到磁盘中;Dump就是一个任务类AbstractTask; 我们上面说过

AbstractTask就是一个信息承载对象,主要给TaskProcessor提供执行所需要的数据;我们看看DumpTask;

DumpTask

DumpTask定义了自己的一些属性; 再看看其他的例如DumpAllTaskDumpAllBetaTask

这两个任务类只定义了TASK_ID

既然有DumpTask任务类,那肯定就有对应的任务处理器类DumpProcessor;

DumpProcessor

DumpProcessor 是DumpTask任务的执行器;执行器中的方法

public boolean process(String taskType, AbstractTask task)

代码太长就不在这里分析了,它里面主要做的操作就是 保存配置文件到本地磁盘中,并缓存md5

详细可以看文章 【Nacos源码之配置管理 四】DumpService如何将配置文件全部Dump到磁盘中

对应DumpAllTaskDumpAllBetaTask 任务的任务执行器有DumpAllProcessorDumpAllBetaProcessor

DumpAllTask任务触发执行的地方

上面是DumpAllTask的定义和DumpAllTaskProcessor执行器的定义;定义好了之后是怎么被触发的呢?

DumpService初始化Dump配置信息

这个类就是专门Dump配置信息的服务类;上面提及的DumpAll就是在这里被调用的;我们来看下他主要方法;

    @PostConstruct
    public void init() {
        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
      /**在new这个TaskManager类的时候,专门执行任务的一个线程就已经开始启动了,这不过这个时候还没有任务Task添加进去**/
       dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager");
dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
      Runnable dumpAll = new Runnable() {
            @Override
            public void run() {
dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
            }
        };
            /**每10分钟执行一次DumpAll操作**/
      TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
                TimeUnit.MINUTES);
    }

DumpService在初始化的时候回调用这个init方法;

1.先new了一个DumpAllProcessor执行器;

2.再new 了一个TaskManager任务管理器;在new这个任务管理器的时候,就会启动一个线程专门去执行所有待执行的任务;只不过这个时候还没有添加任务;

3.将这个任务管理器的默认执行器设置为DumpAllProcessor;

4.每十分钟执行一次往TaskManager中添加一个DumpAllTask的任务;一经添加就会被TaskManager中的线程 processingThread 执行process方法;

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 任务管理类
    • AbstractTask
      • TaskProcessor任务处理器
        • TaskManager任务管理类
        • 使用用例
          • DumpTask
            • DumpProcessor
            • DumpAllTask任务触发执行的地方
              • DumpService初始化Dump配置信息
              相关产品与服务
              对象存储
              对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档