前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DataX源码解析-调度流程

DataX源码解析-调度流程

作者头像
tyrantlucifer
发布2022-08-30 14:25:02
1.1K0
发布2022-08-30 14:25:02
举报
文章被收录于专栏:Tyrant LuciferTyrant Lucifer

前言

书接上回,继续来聊一聊DataX源码,在上篇文章中我们已经对DataX的整体架构以及运行流程有了一个比较细致的了解,这篇文章我们将更深层次的研究DataX在调度方面的细节。

调度流程解析

确认最终任务需要的channel数量

「注:channel是子任务数据传输的内存模型,后续文章将详细剖析,在这里可以暂且认为就是任务分片数量」

在任务周期中含有一个split()阶段,在这个阶段做了两件事情:

  1. 通过配置项计算出建议的并发channel数量
  2. 执行reader插件中的的实际切片逻辑,并根据数量切分configuration,请注意,这一步计算出的数量可能小于第一步配置的并发数

所以在真正调度阶段,需要根据split()阶段中计算的两个值,计算出最终的channel数量

image-20220427120240736

通过channel数量分配taskGroup

在计算出真正需要的channel数量之后,根据每个TaskGroup应该被分配任务的个数,计算TaskGroup的个数:

image-20220427144829662

由上图可知,任务的分配是由JobAssignUtil去进行,而且从方法名称assignFairly也可以知晓,分配的逻辑是公平分配,使用的Round Robin算法,轮询分配到每个TaskGroup中,在此就简单贴一下分配的核心源码:

代码语言:javascript
复制
    /**
     * <pre>
     * 需要实现的效果通过例子来说是:
     * a 库上有表:0, 1, 2
     * a 库上有表:3, 4
     * c 库上有表:5, 6, 7
     *
     * 如果有 4个 taskGroup
     * 则 assign 后的结果为:
     * taskGroup-0: 0,  4,
     * taskGroup-1: 3,  6,
     * taskGroup-2: 5,  2,
     * taskGroup-3: 1,  7
     * </pre>
     * @param resourceMarkAndTaskIdMap 对所有任务进行编号的map
     * @param jobConfiguration job的配置
     * @param taskGroupNumber 一共需要的taskGroup的数量
     * @return taskGroup的配置
     */
    private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {

        // 获取job配置
        List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);

        // 初始化一份taskGroup配置
        Configuration taskGroupTemplate = jobConfiguration.clone();

        // 将配置中的content配置项去掉
        taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);

        // 初始化taskGroup配置
        List<Configuration> result = new LinkedList<Configuration>();
        List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
        for (int i = 0; i < taskGroupNumber; i++) {
            taskGroupConfigList.add(new LinkedList<Configuration>());
        }

        int mapValueMaxLength = -1;

        // 获取任务编号,并给每个任务一个标记,每个任务的标记都一致,用一个map保存,标记 -> 编号
        List<String> resourceMarks = new ArrayList<String>();
        for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {
            resourceMarks.add(entry.getKey());
            if (entry.getValue().size() > mapValueMaxLength) {
                mapValueMaxLength = entry.getValue().size();
            }
        }

        // 核心Round Robin分配逻辑
        int taskGroupIndex = 0;
        for (int i = 0; i < mapValueMaxLength; i++) {
            for (String resourceMark : resourceMarks) {
                if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
                    int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
                    taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
                    taskGroupIndex++;

                    resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
                }
            }
        }

        Configuration tempTaskGroupConfig;
        for (int i = 0; i < taskGroupNumber; i++) {
            tempTaskGroupConfig = taskGroupTemplate.clone();
            tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
            tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);

            result.add(tempTaskGroupConfig);
        }

        return result;
    }

启动调度

ok,已经确定完了taskGroup的个数以及每个taskGroup的channel数,接下来就到了真正启动任务的环节:

代码语言:javascript
复制
        // 初始化运行模式
        ExecuteMode executeMode = null;

        // 初始化调度器
        AbstractScheduler scheduler;
        try {
            // 在这里可以看到,DataX官方写死了运行模式,只支持STANDALONE模式,不支持分布式
         executeMode = ExecuteMode.STANDALONE;

            // 初始化调度器
            scheduler = initStandaloneScheduler(this.configuration);

            // 设置运行模式
            for (Configuration taskGroupConfig : taskGroupConfigs) {
                taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
            }

            // 看到这里我彻底笑了 ^_^ ^_^ ^_^
            if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
                if (this.jobId <= 0) {
                    throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
                            "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
                }
            }

            LOG.info("Running by {} Mode.", executeMode);

            // 记录任务开始时间
            this.startTransferTimeStamp = System.currentTimeMillis();

            // 开始调度
            scheduler.schedule(taskGroupConfigs);

            // 记录任务结束时间
            this.endTransferTimeStamp = System.currentTimeMillis();
        } catch (Exception e) {
            LOG.error("运行scheduler 模式[{}]出错.", executeMode);
            this.endTransferTimeStamp = System.currentTimeMillis();
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }

        // 检查任务执行情况
        this.checkLimit();

整个任务从scheduler.schedule(taskGroupConfigs)处被启动,在这个方法中,又调用了startAllTaskGroup(configurations)

image-20220427180441170

代码语言:javascript
复制
    @Override
    public void startAllTaskGroup(List<Configuration> configurations) {
        // 生成线程池,对于线程池的个数是由taskGroup的数量进行控制
        this.taskGroupContainerExecutorService = Executors
                .newFixedThreadPool(configurations.size());

        // 注册taskGroup任务到线程池中并启动
        for (Configuration taskGroupConfiguration : configurations) {
            TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
            this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
        }
        // 关闭线程池提交线程的入口
        this.taskGroupContainerExecutorService.shutdown();
    }

实际上DataX底层对于每个taskGroup都启动了一个线程TaskGroupContainerRunner,采用线程池的方式实现并发操作

调度子单位解析

TaskGroupContainerRunner

TaskGroupContainerRunner是线程子单位,作为最上层的封装,直接提交到线程池中执行:

image-20220427182500027

代码语言:javascript
复制
public class TaskGroupContainerRunner implements Runnable {

 private TaskGroupContainer taskGroupContainer;

 private State state;

 public TaskGroupContainerRunner(TaskGroupContainer taskGroup) {
  this.taskGroupContainer = taskGroup;
  this.state = State.SUCCEEDED;
 }

 @Override
 public void run() {
  try {
            Thread.currentThread().setName(
                    String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
            // 启动真正的并发子单位
            this.taskGroupContainer.start();
   this.state = State.SUCCEEDED;
  } catch (Throwable e) {
   this.state = State.FAILED;
   throw DataXException.asDataXException(
     FrameworkErrorCode.RUNTIME_ERROR, e);
  }
 }

 public TaskGroupContainer getTaskGroupContainer() {
  return taskGroupContainer;
 }

 public State getState() {
  return state;
 }

 public void setState(State state) {
  this.state = state;
 }
}

从代码上看,真正核心的并发子单位是TaskGroupContainer

TaskContainer

来到真正核心的TaskContainer中,这里真正启动了任务,TaskContainer主要做了以下事情:

  1. 注册task

image-20220427183425530

  1. 启动task,每个子任务也就是最小的并发单位的执行器是TaskExecutor

image-20220427183838995

  1. 循环监控task状态,如果出现失败会进行重试

image-20220427183937709

TaskExecutor

image-20220427185430963

从这个类里面可以看见,做核心传输逻辑的变量是两个线程,一个写的线程,一个读的线程,在doStart方法中,启动了这两个线程:

image-20220427191129968

至此,整个任务被完全启动了起来。

总结

DataX整个调度依赖于java底层线程池,它对任务进行分片后并将子任务使用Round Robin算法划分到各个任务组,以一个任务组为基本线程放进线程池并启动;同时一个子任务也包含两个线程去实现写读的流程,DataX能实现精准的流控在于它底层对分片的控制,至此,DataX的全部调度流程概括如下:

  1. 根据流控、并发配置确定分片数量
  2. 根据分片数量确定TaskGroup数量
  3. 通过Round Robin算法分配task至TaskGroup
  4. 启动TaskGroup
  5. 每个TaskGroup启动多个TaskExecutor
  6. TaskExecutor启动ReaderThread和WriterThread

下篇文章我们将聊一聊DataX的单个数据分片之间的数据传输原理,也就是TaskExecutor中的ReaderThread和WriterThread之间如何交换数据,敬请期待,我们下期再见!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-04-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Tyrant Lucifer 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 调度流程解析
    • 确认最终任务需要的channel数量
      • 通过channel数量分配taskGroup
        • 启动调度
        • 调度子单位解析
          • TaskGroupContainerRunner
            • TaskContainer
              • TaskExecutor
              • 总结
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档