前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DataX使用中的一个坑(BUG)

DataX使用中的一个坑(BUG)

作者头像
用户5252199
发布2022-04-18 18:38:42
3.6K0
发布2022-04-18 18:38:42
举报
文章被收录于专栏:大数据技术博文

使用Datax进行两个集群间的数据同步,在读取HDFS数据时,会出现数据丢失问题,本文针对数据丢失问题做出了分析以及对应解决方案,希望帮助大家在使用Datax过程中避免该问题的出现!。

01

问题描述

最近在使用Datax进行两个集群间的数据同步,将老集群(Hive)数据同步到新集群(Phoenix)中,由于两个集群的大数据节点IP不同,老集群有三个IP段(10、20、30)采用的是30段(大数据专用网断),而新集群是采用的20网断,所以无法通过Datax链接老集群HDFS路径,当然这个并不是问题重点。

第一次同步的时候数据是全部同步到了新集群,然而,因为业务关系某个表需要添加三个字段,之后表数据又重新构建了一遍,我们称之为info表吧,然后再次执行脚本将info表同步一下,本来是5000W+的数据,同步完之后缺失了700W+

请忽略读写失败总数,这个不是问题关键,这个是由于字段长度超出了Phoenix表配置的长度,并非Datax问题。

注意看读出记录总数:4442W,而我实际数据表中是有5152W,少了700W+

之后重试了两次,问题依旧!

02

问题分析

导入成功了4442w,剩余700w没有导入,可以先查看这未成功导入的700w数据是否有数据问题,之后进行了以下操作,最终定位到问题所在

03

步骤一

将新集群的HDFS数据,导入到hive中查看数据量是否缺少,发现将数据同步到hive之后,数据量与老集群是保持一致,这里基本可以断定数据本身是没有问题的

04

步骤二

查询未成功导入到phoenix的数据信息,先根据某一个字段的groupby数据量查看那个条件的数据量少且数据丢失了,从这个字段条件入手,然后找到了100+条数据未成功导入到phoenix,更加神奇的是这100+条数据,在HDFS中属于同一个文件块000676_0,同时这100+条数据在块中是连续的(这也是一个问题)

然而这100条数据的上一条数据是在phoenix中可以查询到的,

所以将这100条数据单独抽取出来放在HDFS块中,然后单独的进行同步,在启动同步之后,发现日志中的异常如下:

提示,读取的列越界,源文件改行有36列,您尝试读取第37列

将该条数据查出来然后在本地代码split一下,发现列数果然不对(在datax中的json文件中配置的是39列,实际也是39列),然后将第一行数据删掉,再次同步一下,发现数据同步进去了。那么在这里发现了一个问题就是当某一个文件块中其中一条数据读取解析异常了,那么读取到的这批数据就都会异常(代码里面是批量读取,批量解析的)

04

步骤三

将Datax代码clone下来研究了一下,这里要提及一句我们phoenix使用的是5.0对应hbase2.0版本,datax也是在我们同步数据前的16天提交支持了hbase20xsqlreaderhbase20xsqlwrite

先从hbase20xsqlwrite 包代码研究,发现是从RecordReceiver接收器中获取到的数据,而最后读出记录总数:44426102则是记录了一共从RecordReceiver接收到的数据条数。所以问题没有出在write这一块,基本判断应该是出在了hdfsreader

转而研究一下hdfsreader的代码,类结构很简单,只需要直接看HdfsReader类即可。里面的逻辑也很简单,在这里就不过多的解释了。

从上面warn信息里面看,是UnstructuredStorageReaderUtil这个类发出的警告,具体的包名为:

代码语言:javascript
复制
com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil ;

发现是该类的transportOneRecord(xx) 函数里面报的异常,然后向上追溯到了doReadFromStream函数

代码语言:javascript
复制
doReadFromStream(BufferedReader reader, String context,Configuration readerSliceConfig, RecordSender recordSender,TaskPluginCollector taskPluginCollector)

这里面可以看到不管读取的文件是TEXT类型还是CSV类型的,都是按照CSV进行解析的。

05

问题定位

数据解析之后列的个数的确与实际的不符合,之后查看数据,发现出问题的数据中有几串连续的空的,所以数据在解析的时候将空的给过滤了,比如[1,2,,,,6]解析得到的是[1,2,6],所以才会出现列越界的问题。

这里需要修改一下代码:

源代码:

代码语言:javascript
复制
csvReader = new CsvReader(reader);
csvReader.setDelimiter(fieldDelimiter);
setCsvReaderConfig(csvReader);
String[] parseRows;
while ((parseRows = UnstructuredStorageReaderUtil
        .splitBufferedReader(csvReader)) != null) {
    UnstructuredStorageReaderUtil.transportOneRecord(recordSender,
            column, parseRows, nullFormat, taskPluginCollector);
}

修改为:

代码语言:javascript
复制
String line = null;
while ((line = reader.readLine()) != null) {
String[] parseRows = line.split(delimiterInStr, -1);
UnstructuredStorageReaderUtil.transportOneRecord(recordSender,
                        column, parseRows, nullFormat, taskPluginCollector);
   }

同时要在finally中将CsvReader.close 去掉,试图设置csvReader的参数实现空值保留,但是均无效,如果您有更好的办法可以修改csvreader的话也可以。这样尽量保证源代码的味道!但是不知道是否能保证批次数据里面一条失败是否会导致后面的不成功。

将代码改成上面这样同时也避免了这样的一个问题。一行一行的读取,然后发送给write,由write去决定接受多少条进行写入!

至此,再次打包编译,再次执行同步脚本,数据完全同步过去,没有异常的数据!

06

问题浮现

  • 在读取HDFS数据时,没有对空串进行处理,导致读到的列出与配置的列数不一致
  • 当读到的批次数据通过csvreader进行解析时,有一条失败其他条也并没有发送给write接收器

07

问题引申

Datax到同步数据的时候,有一个脏数据的概念,比如这次在同步数据时候,会有一些脏数据的问题发生,plugin的处理方式时,如果批次里面有一条出现了问题,那么就会将这批次数据进行循环操作,找出出问题的那一条,加入脏数据处理任务里面,然后脏数据任务是将任务里面的数据重试三次,如果三次都失败就丢掉了!

Datax本身框架是可以将脏数据本地输出或者集中式汇报的,只是plugin在write的时候直接将异常抛给了脏数据任务,而没有单独做处理,所以这块也需要做一些修改,将脏数据统一写入指定文件中!需要在json里面加一个配置

此问题列为第三个问题!

08

问题总结

DataX本身是一个很好的支持异构数据同步的框架,而且具备很好的灵活扩展性,如果不修改代码的情况下,从数据角度出发,通过采取空值转为NULL或者其它字符也可以解决该问题。

这里通过修改源代码,将字符串分割处保留空值内容,最终解决问题。

希望该文对您有帮助!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-04-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术博文 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档