前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DataX源码解析-数据传输

DataX源码解析-数据传输

作者头像
tyrantlucifer
发布2022-08-30 14:29:21
1.2K1
发布2022-08-30 14:29:21
举报
文章被收录于专栏:Tyrant LuciferTyrant Lucifer

前言

书接上回,继续来聊一聊DataX源码,在上篇文章中我们已经对于DataX的调度流程进行了细致的剖析,这篇文章我们将更深层次的研究DataX在数据传输与交换方面的细节。

简单回顾

上文提到,DataX核心运行子单位是TaskExecutor,一个TaskExecutor中会拥有两个线程,分别是WriterThreadReaderThread,这两个线程承担着整个数据传输的重任,所以今天整篇文章的重点将围绕这两个线程展开,如果读者阅读至此觉得概念晦涩难懂,请移步我之前的两篇文章去先了解一下整个DataX的原理和架构:

  1. DataX整体架构:DataX源码解析-整体架构
  2. DataX调度流程:DataX源码解析-调度流程

线程的创建

来到TaskGroupContainer源码中,找到TaskExecutor新建WriterThreadReaderThread的地方:

代码语言:javascript
复制
// 生成WriterThread
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
                               String.format("%d-%d-%d-writer",
                                             jobId, taskGroupId, this.taskId));

// 生成ReaderThread
readerRunner = (ReaderRunner) generateRunner(PluginType.READER, transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
                               String.format("%d-%d-%d-reader",
                                             jobId, taskGroupId, this.taskId));

承载线程执行的Runner都是由generateRunner这个方法生成:

代码语言:javascript
复制
// 根据不同的插件类型生成对应的插件线程Runner
private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {
    
    AbstractRunner newRunner = null;
    
    TaskPluginCollector pluginCollector;

    switch (pluginType) {
        case READER:
            // 加载插件
            newRunner = LoadUtil.loadPluginRunner(pluginType,
                                                  this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
            // 为插件加载配置
            newRunner.setJobConf(this.taskConfig.getConfiguration(
                CoreConstant.JOB_READER_PARAMETER));

            pluginCollector = ClassUtil.instantiate(
                taskCollectorClass, AbstractTaskPluginCollector.class,
                configuration, this.taskCommunication,
                PluginType.READER);
   
            // 内存交换子模型
            RecordSender recordSender;
            
            if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
                recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel, this.taskCommunication, pluginCollector, transformerInfoExecs);
            } else {
                recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
            }
   
            // 设置内存交换子模型
            ((ReaderRunner) newRunner).setRecordSender(recordSender);

            // 设置taskPlugin的collector,用来处理脏数据和job/task通信
            newRunner.setTaskPluginCollector(pluginCollector);
            break;
        case WRITER:
            // 加载插件
            newRunner = LoadUtil.loadPluginRunner(pluginType,
                                                  this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
            // 为插件加载配置
            newRunner.setJobConf(this.taskConfig
                                 .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));

            pluginCollector = ClassUtil.instantiate(
                taskCollectorClass, AbstractTaskPluginCollector.class,
                configuration, this.taskCommunication,
                PluginType.WRITER);
            
            // 设置内存交换子模型
            ((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
                this.channel, pluginCollector));

            // 设置taskPlugin的collector,用来处理脏数据和job/task通信
            newRunner.setTaskPluginCollector(pluginCollector);
            break;
        default:
            throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);
    }

    newRunner.setTaskGroupId(taskGroupId);
    newRunner.setTaskId(this.taskId);
    newRunner.setRunnerCommunication(this.taskCommunication);

    return newRunner;
}

代码虽有些冗余,但是我还是全部贴了出来,我认为这里对于整个流程的理解很重要,如果读者不愿意仔细研读,我在这里简单概括一下就是DataX使用自己定义的类加载器去加载对应插件防止出现jar包冲突的情况,同时为不同类型(Reader或Writer)的插件去初始化对应的内存交换模型,但这里还没有出现数据交换的相关信息,好消息是内存交换模型出现了,接下来我们将逐渐揭开数据传输的真正面纱。

WriterRunner与ReaderRunner

run方法

WriterRunner
代码语言:javascript
复制
@Override
public void run() {
    Validate.isTrue(this.recordReceiver != null);

    Writer.Task taskWriter = (Writer.Task) this.getPlugin();
    //统计waitReadTime,并且在finally end
    PerfRecord channelWaitRead = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_READ_TIME);
    try {
        channelWaitRead.start();
        LOG.debug("task writer starts to do init ...");
        PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_INIT);
        initPerfRecord.start();
        taskWriter.init();
        initPerfRecord.end();

        LOG.debug("task writer starts to do prepare ...");
        PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_PREPARE);
        preparePerfRecord.start();
        taskWriter.prepare();
        preparePerfRecord.end();
        LOG.debug("task writer starts to write ...");

        PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DATA);
        dataPerfRecord.start();
        // 这里很重要!!!
        taskWriter.startWrite(recordReceiver);

        dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
        dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
        dataPerfRecord.end();

        LOG.debug("task writer starts to do post ...");
        PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_POST);
        postPerfRecord.start();
        taskWriter.post();
        postPerfRecord.end();

        super.markSuccess();
    } catch (Throwable e) {
        LOG.error("Writer Runner Received Exceptions:", e);
        super.markFail(e);
    } finally {
        LOG.debug("task writer starts to do destroy ...");
        PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DESTROY);
        desPerfRecord.start();
        super.destroy();
        desPerfRecord.end();
        channelWaitRead.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_READER_TIME));
    }
}

在WriterRunner核心run方法中,主要进行了对Writer插件各个生命周期的调用和每个阶段的耗时统计,但最重要的是我们发现了WriterRunner开始写数据的入口:

代码语言:javascript
复制
taskWriter.startWrite(recordReceiver);

对于WriterThread取数据然后再写数据的媒介是这个神秘的recordReceiver,在上面创建线程的同时我们也发现了有代码会设置recordReceiver

代码语言:javascript
复制
((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
                            this.channel, pluginCollector));

综上所述,写线程的写操作核心依赖RecordReceiver

ReaderRunner
代码语言:javascript
复制
@Override
public void run() {
    assert null != this.recordSender;

    Reader.Task taskReader = (Reader.Task) this.getPlugin();

    //统计waitWriterTime,并且在finally才end。
    PerfRecord channelWaitWrite = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME);
    try {
        channelWaitWrite.start();

        LOG.debug("task reader starts to do init ...");
        PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
        initPerfRecord.start();
        taskReader.init();
        initPerfRecord.end();

        LOG.debug("task reader starts to do prepare ...");
        PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE);
        preparePerfRecord.start();
        taskReader.prepare();
        preparePerfRecord.end();

        LOG.debug("task reader starts to read ...");
        PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
        dataPerfRecord.start();
        // 这里很重要!!!
        taskReader.startRead(recordSender);
        recordSender.terminate();

        dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
        dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
        dataPerfRecord.end();

        LOG.debug("task reader starts to do post ...");
        PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_POST);
        postPerfRecord.start();
        taskReader.post();
        postPerfRecord.end();
        // automatic flush
        // super.markSuccess(); 这里不能标记为成功,成功的标志由 writerRunner 来标志(否则可能导致 reader 先结束,而 writer 还没有结束的严重 bug)
    } catch (Throwable e) {
        LOG.error("Reader runner Received Exceptions:", e);
        super.markFail(e);
    } finally {
        LOG.debug("task reader starts to do destroy ...");
        PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);
        desPerfRecord.start();
        super.destroy();
        desPerfRecord.end();

        channelWaitWrite.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_WRITER_TIME));

        long transformerUsedTime = super.getRunnerCommunication().getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME);
        if (transformerUsedTime > 0) {
            PerfRecord transformerRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.TRANSFORMER_TIME);
            transformerRecord.start();
            transformerRecord.end(transformerUsedTime);
        }
    }
}

在ReaderRunner核心run方法中,主要进行了对Reader插件各个生命周期的调用和每个阶段的耗时统计,但最重要的是我们发现了ReaderRunner开始读数据的入口:

代码语言:javascript
复制
taskReader.startRead(recordSender);

对于ReaderThread写数据的媒介是这个神秘的recordSender,在上面创建线程的同时我们也发现了有代码会设置recordSender

代码语言:javascript
复制
((ReaderRunner) newRunner).setRecordSender(recordSender);

综上所述,读线程的读操作核心依赖RecordSender

WriterRunner类图

image-20220530152815594

ReaderRunner类图

image-20220530153008337

综上所述,读线程和写线程各自拥有着对应的内存交换模型去交换数据,所以接下来的研究核心将转向RecorderReceiverRecordSender

RecordReceiver

image-20220606144047349

打开RecordReceiver的源码,发现它是个接口,实际上实现形式有三种,从字面命名可以看出,有1对1交换实现,还有1对多缓存交换实现,在实际DataX代码中为提高性能使用的是BufferedRecordExchanger

image-20220606145022684

RecordSender

image-20220606144631726

和RecordReceiver一致,同样RecordSender也是一个接口,实际上实现形式和RecordSender一致,在实际DataX代码中为提高性能使用的是BufferedRecordExchanger

image-20220606145159828

BufferedRecordExchanger

image-20220606145401740

BufferedRecordExchanger实现了对应两个接口,而且在类中我们发现了之前提过的Channel内存模型对象,通过Channel内存模型对象在RecordSenderRecordReceiver之间交换数据,来仔细看一下对应的getFromReader()sendToWriter(Record)方法:

代码语言:javascript
复制
@Override
public void sendToWriter(Record record) {
    if(shutdown){
        throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
    }

    Validate.notNull(record, "record不能为空.");

    if (record.getMemorySize() > this.byteCapacity) {
        this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
        return;
    }

    boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
    if (isFull) {
        // 缓存满了清空缓存写入
        flush();
    }

    this.buffer.add(record);
    this.bufferIndex++;
    memoryBytes.addAndGet(record.getMemorySize());
}
代码语言:javascript
复制
@Override
public void flush() {
    if(shutdown){
        throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
    }
    this.channel.pushAll(this.buffer);
    this.buffer.clear();
    this.bufferIndex = 0;
    this.memoryBytes.set(0);
}

发送过程逻辑很简单,一个很一般的buffer思路,生成数据先写入buffer,buffer满了统一写入到channel

代码语言:javascript
复制
@Override
public Record getFromReader() {
    if(shutdown){
        throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
    }
    boolean isEmpty = (this.bufferIndex >= this.buffer.size());
    if (isEmpty) {
        // 缓存空了从再次读取
        receive();
    }

    Record record = this.buffer.get(this.bufferIndex++);
    if (record instanceof TerminateRecord) {
        record = null;
    }
    return record;
}
代码语言:javascript
复制
private void receive() {
    this.channel.pullAll(this.buffer);
    this.bufferIndex = 0;
    this.bufferSize = this.buffer.size();
}

读取过程逻辑同样很简单,先从buffer读,buffer空了从channel中再次读取

Channel

概述

由上文可知,Channel是数据存储的基本单位,用户可以根据不同需求去自定义实现这个规范:

image-20220606150923488

内存模型里定义了统计限速行为以及数据推拉行为,定义了核心的消费者生产者模型,在DataX源码中,目前开源了的只有一种Channel的模型实现:

image-20220606151316718

MemoryChannel

接下来我们来看一下内存模型的具体实现:

image-20220606173553881

比较核心的两个方法是doPushdoPull

代码语言:javascript
复制
@Override
protected void doPush(Record r) {
    try {
        long startTime = System.nanoTime();
        this.queue.put(r);
        waitWriterTime += System.nanoTime() - startTime;
        memoryBytes.addAndGet(r.getMemorySize());
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}
代码语言:javascript
复制
@Override
protected Record doPull() {
    try {
        long startTime = System.nanoTime();
        Record r = this.queue.take();
        waitReaderTime += System.nanoTime() - startTime;
        memoryBytes.addAndGet(-r.getMemorySize());
        return r;
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new IllegalStateException(e);
    }
}

由源码可知,doPull和doPush方法主要是通过queue对象进行数据的交换,实际上queue底层的实现是ArrayBlockQueue,push数据是调用queuetake方法

,pull方法调用queuetake方法,至此,整个DataX数据交换流程结束。

总结

本篇文章我们从更细致的角度分析了Reader和Writer插件之间的数据交换流程和原理,总体概括一下,DataX实现并发数据传输和交换的特点如下:

  1. 抽象统一数据内存模型,清晰明确的表达出一个保存数据的内存模型需要哪些功能
  2. 抽象统一数据交换模型,清晰明确的表达出生产者消费者模型
  3. 利用同一个抽象内存模型协调生产者和消费者之间的关系
  4. 使用多线程实现读写异步执行
  5. 合理利用缓存理论提高数据传输的性能

下篇文章将对DataX的插件开发流程做一个详细的剖析,敬请期待,我们下期再见!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Tyrant Lucifer 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 简单回顾
  • 线程的创建
  • WriterRunner与ReaderRunner
    • run方法
      • WriterRunner
      • ReaderRunner
    • WriterRunner类图
      • ReaderRunner类图
        • RecordReceiver
          • RecordSender
            • BufferedRecordExchanger
            • Channel
              • 概述
                • MemoryChannel
                • 总结
                相关产品与服务
                数据保险箱
                数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档