前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊OtterController

聊聊OtterController

原创
作者头像
code4it
修改2020-06-11 10:15:55
3630
修改2020-06-11 10:15:55
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下OtterController

OtterController

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

代码语言:javascript
复制
public class OtterController implements NodeTaskListener, OtterControllerMBean {
​
    private static final Logger                   logger      = LoggerFactory.getLogger(OtterController.class);
​
    // 第一层为pipelineId,第二层为S.E.T.L模块
    private Map<Long, Map<StageType, GlobalTask>> controllers = OtterMigrateMap.makeComputingMap(new Function<Long, Map<StageType, GlobalTask>>() {
​
                                                                  public Map<StageType, GlobalTask> apply(Long pipelineId) {
                                                                      return new MapMaker().makeMap();
                                                                  }
                                                              });
    private ConfigClientService                   configClientService;
    private ArbitrateManageService                arbitrateManageService;
    private NodeTaskService                       nodeTaskService;
    // 各种资源管理
    private DataSourceService                     dataSourceService;                                           // 连接池资源
    private DbDialectFactory                      dbDialectFactory;                                            // 数据库信息资源
    private ArbitrateEventService                 arbitrateEventService;                                       // 仲裁器资源
    private ExecutorService                       executorService;
​
    private StageAggregationCollector             stageAggregationCollector;
​
    public void start() throws Throwable {
        // 初始化节点
        initNid();
        nodeTaskService.addListener(this); // 将自己添加为NodeTask响应者
    }
​
    public void stop() throws Throwable {
        for (Map<StageType, GlobalTask> tasks : controllers.values()) {
            for (GlobalTask task : tasks.values()) {
                try {
                    task.shutdown();
                } catch (Exception e) {
                    logger.error("##shutdown task error!", e);
                }
            }
        }
​
        try {
            Long nid = configClientService.currentNode().getId();
            arbitrateManageService.nodeEvent().destory(Long.valueOf(nid));
        } catch (Exception e) {
            logger.error("##destory node error!", e);
        }
​
        try {
            arbitrateEventService.toolEvent().release();
        } catch (Exception e) {
            logger.error("##destory arbitrate error!", e);
        }
​
        try {
            nodeTaskService.stopNode(); // 通知manager停止当前node
        } catch (Exception e) {
            logger.error("##stop node error!", e);
        }
​
        try {
            OtterContextLocator.close();
        } catch (Exception e) {
            logger.error("##cloes spring error!", e);
        }
​
        ZooKeeperClient.destory();// 关闭zookeeper
    }
​
    //......
​
}
  • OtterController实现了NodeTaskListener接口,提供了start、stop、process方法;其start方法主要是执行initNid及nodeTaskService.addListener(this);其stop方法则遍历controllers的GlobalTask,挨个执行其shutdown方法,然后执行arbitrateManageService.nodeEvent().destory()、arbitrateEventService.toolEvent().release()、nodeTaskService.stopNode()、OtterContextLocator.close()及ZooKeeperClient.destory();process方法主要是遍历nodeTasks,挨个执行stopPipeline或者startPipeline

initNid

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

代码语言:javascript
复制
    private void initNid() {
        // 获取一下nid变量
        String nid = System.getProperty(OtterConstants.NID_NAME);
        if (StringUtils.isEmpty(nid)) {
            throw new ConfigException("nid is not set!");
        }
        logger.info("INFO ## the nodeId = {}", nid);
        checkNidVaild(nid);
        arbitrateManageService.nodeEvent().init(Long.valueOf(nid));
        // 添加session expired处理
        NodeSessionExpired sessionExpired = new NodeSessionExpired();
        sessionExpired.setNodeEvent(arbitrateManageService.nodeEvent());
        ZooKeeperClient.registerNotification(sessionExpired);
    }
  • initNid方法主要执行arbitrateManageService.nodeEvent().init(Long.valueOf(nid))及ZooKeeperClient.registerNotification(sessionExpired)

startPipeline

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

代码语言:javascript
复制
    public void startPipeline(NodeTask nodeTask) {
        Long pipelineId = nodeTask.getPipeline().getId();
        releasePipeline(pipelineId);
        Map<StageType, GlobalTask> tasks = controllers.get(pipelineId);
        // 处理具体的任务命令
        List<StageType> stage = nodeTask.getStage();
        List<TaskEvent> event = nodeTask.getEvent();
        for (int i = 0; i < stage.size(); i++) {
            StageType stageType = stage.get(i);
            TaskEvent taskEvent = event.get(i);
            if (taskEvent.isCreate()) {
                startTask(nodeTask.getPipeline(), tasks, stageType);
            } else {
                stopTask(tasks, stageType);
            }
        }
    }
​
    private void startTask(Pipeline pipeline, Map<StageType, GlobalTask> tasks, StageType taskType) {
        if (tasks.get(taskType) != null && tasks.get(taskType).isAlive()) {
            logger.warn("WARN ## this task = {} has started", taskType);
        }
​
        GlobalTask task = null;
        if (taskType.isSelect()) {
            task = new SelectTask(pipeline.getId());
        } else if (taskType.isExtract()) {
            task = new ExtractTask(pipeline.getId());
        } else if (taskType.isTransform()) {
            task = new TransformTask(pipeline.getId());
        } else if (taskType.isLoad()) {
            task = new LoadTask(pipeline.getId());
        }
​
        if (task != null) {
            OtterContextLocator.autowire(task); // 注入一下spring资源
            task.start();
            tasks.put(taskType, task);
            logger.info("INFO ## start this task = {} success", taskType.toString());
        }
    }
​
    private void stopTask(Map<StageType, GlobalTask> tasks, StageType taskType) {
        GlobalTask task = tasks.remove(taskType);
        if (task != null) {
            task.shutdown();
            logger.info("INFO ## taskName = {} has shutdown", taskType);
        } else {
            logger.info("INFo ## taskName = {} is not started", taskType);
        }
​
    }
​
  • startPipeline方法先执行releasePipeline,然后遍历指定pipeline的tasks,挨个执行startTask或者stopTask;startTask方法根据taskType类创建不同类型的GlobalTask,然后执行task.start();stopTask则从tasks移除指定taskType的GlobalTask,然后执行task的shutdown方法

stopPipeline

otter/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java

代码语言:javascript
复制
    private void stopPipeline(Long pipelineId, Map<StageType, GlobalTask> tasks) {
        for (GlobalTask task : tasks.values()) {
            try {
                task.shutdown();
            } catch (Exception e) {
                logger.error("## stop s/e/t/l task error!", e);
            } finally {
                tasks.remove(task);
            }
        }
        // close other resources.
        try {
            Thread.sleep(1 * 1000); // sleep 5s,等待S.E.T.L释放线程
        } catch (InterruptedException e) {
            logger.error("ERROR ## ", e);
        }
​
        // 释放资源
        releasePipeline(pipelineId);
        arbitrateEventService.toolEvent().release(pipelineId);
    }
​
    private void releasePipeline(Long pipelineId) {
        dataSourceService.destroy(pipelineId);
        dbDialectFactory.destory(pipelineId);
    }
  • stopPipeline方法则遍历tasks挨个执行task.shutdown()及tasks.remove(task),最后执行releasePipeline及arbitrateEventService.toolEvent().release(pipelineId)

小结

OtterController实现了NodeTaskListener接口,提供了start、stop、process方法;其start方法主要是执行initNid及nodeTaskService.addListener(this);其stop方法则遍历controllers的GlobalTask,挨个执行其shutdown方法,然后执行arbitrateManageService.nodeEvent().destory()、arbitrateEventService.toolEvent().release()、nodeTaskService.stopNode()、OtterContextLocator.close()及ZooKeeperClient.destory();process方法主要是遍历nodeTasks,挨个执行stopPipeline或者startPipeline

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • OtterController
  • initNid
  • startPipeline
  • stopPipeline
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档