专栏首页大数据-数据人生ImportTsv-HBase数据导入工具

ImportTsv-HBase数据导入工具

ImportTsv-HBase数据导入工具

作者:幽鸿  

一、概述

HBase官方提供了基于Mapreduce的批量数据导入工具:Bulk load和ImportTsv。关于Bulk load大家可以看下我另一篇博文

通常HBase用户会使用HBase API导数,但是如果一次性导入大批量数据,可能占用大量Regionserver资源,影响存储在该Regionserver上其他表的查询,本文将会从源码上解析ImportTsv数据导入工具,探究如何高效导入数据到HBase。

二、ImportTsv介绍

ImportTsv是Hbase提供的一个命令行工具,可以将存储在HDFS上的自定义分隔符(默认\t)的数据文件,通过一条命令方便的导入到HBase表中,对于大数据量导入非常实用,其中包含两种方式将数据导入到HBase表中:

第一种是使用TableOutputformat在reduce中插入数据;

第二种是先生成HFile格式的文件,再执行一个叫做CompleteBulkLoad的命令,将文件move到HBase表空间目录下,同时提供给client查询。

三、源码解析

本文基于CDH5 HBase0.98.1,ImportTsv的入口类是org.apache.hadoop.hbase.mapreduce.ImportTsv

[java] view plaincopyprint?

  1. String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);  
  2. String columns[] = conf.getStrings(COLUMNS_CONF_KEY);  
  3. if (hfileOutPath != null) {  
  4.   if (!admin.tableExists(tableName)) {  
  5.     LOG.warn(format("Table '%s' does not exist.", tableName));  
  6.     // TODO: this is backwards. Instead of depending on the existence of a table,  
  7.     // create a sane splits file for HFileOutputFormat based on data sampling.  
  8.     createTable(admin, tableName, columns);  
  9.   }  
  10.   HTable table = new HTable(conf, tableName);  
  11.   job.setReducerClass(PutSortReducer.class);  
  12.   Path outputDir = new Path(hfileOutPath);  
  13.   FileOutputFormat.setOutputPath(job, outputDir);  
  14.   job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
  15.   if (mapperClass.equals(TsvImporterTextMapper.class)) {  
  16.     job.setMapOutputValueClass(Text.class);  
  17.     job.setReducerClass(TextSortReducer.class);  
  18.   } else {  
  19.     job.setMapOutputValueClass(Put.class);  
  20.     job.setCombinerClass(PutCombiner.class);  
  21.   }  
  22.   HFileOutputFormat.configureIncrementalLoad(job, table);  
  23. } else {  
  24.   if (mapperClass.equals(TsvImporterTextMapper.class)) {  
  25.     usage(TsvImporterTextMapper.class.toString()  
  26.         + " should not be used for non bulkloading case. use "  
  27.         + TsvImporterMapper.class.toString()  
  28.         + " or custom mapper whose value type is Put.");  
  29.     System.exit(-1);  
  30.   }  
  31.   // No reducers. Just write straight to table. Call initTableReducerJob  
  32.   // to set up the TableOutputFormat.  
  33.   TableMapReduceUtil.initTableReducerJob(tableName, null, job);  
  34.   job.setNumReduceTasks(0);  
  35. }  
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
if (hfileOutPath != null) {
  if (!admin.tableExists(tableName)) {
    LOG.warn(format("Table '%s' does not exist.", tableName));
    // TODO: this is backwards. Instead of depending on the existence of a table,
    // create a sane splits file for HFileOutputFormat based on data sampling.
    createTable(admin, tableName, columns);
  }
  HTable table = new HTable(conf, tableName);
  job.setReducerClass(PutSortReducer.class);
  Path outputDir = new Path(hfileOutPath);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  if (mapperClass.equals(TsvImporterTextMapper.class)) {
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(TextSortReducer.class);
  } else {
    job.setMapOutputValueClass(Put.class);
    job.setCombinerClass(PutCombiner.class);
  }
  HFileOutputFormat.configureIncrementalLoad(job, table);
} else {
  if (mapperClass.equals(TsvImporterTextMapper.class)) {
    usage(TsvImporterTextMapper.class.toString()
        + " should not be used for non bulkloading case. use "
        + TsvImporterMapper.class.toString()
        + " or custom mapper whose value type is Put.");
    System.exit(-1);
  }
  // No reducers. Just write straight to table. Call initTableReducerJob
  // to set up the TableOutputFormat.
  TableMapReduceUtil.initTableReducerJob(tableName, null, job);
  job.setNumReduceTasks(0);
}

从ImportTsv.createSubmittableJob方法中判断参数BULK_OUTPUT_CONF_KEY开始,这步直接影响ImportTsv的Mapreduce作业最终以哪种方式入HBase库

如果不为空并且用户没有自定义Mapper实现类(参数importtsv.mapper.class)时,则使用PutSortReducer,其中会对Put排序,如果每行记录有很多column,则会占用Reducer大量的内存资源进行排序。

[java] view plaincopyprint?

  1. Configuration conf = job.getConfiguration();  
  2. HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));  
  3. job.setOutputFormatClass(TableOutputFormat.class);  
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setOutputFormatClass(TableOutputFormat.class);

如果为空,调用TableMapReduceUtil.initTableReducerJob初始化TableOutputformat的Reducer输出,此方式不需要使用Reducer,因为直接在mapper的Outputformat中会批量的调用Put API将数据提交到Regionserver上(相当于并行的执行HBase Put API)

四、实战

1、使用TableOutputformat的Put API上传数据,非bulk-loading

[java] view plaincopyprint?

  1. $ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c 
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c  

2、使用bulk-loading生成StoreFiles(HFile)

step1、生成Hfile

[java] view plaincopyprint?

  1. $ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir 
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir  

step2、完成导入

[java] view plaincopyprint?

  1. $ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 
$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles  

五、总结

在使用ImportTsv时,一定要注意参数importtsv.bulk.output的配置,通常来说使用Bulk output的方式对Regionserver来说更加友好一些,这种方式加载数据几乎不占用Regionserver的计算资源,因为只是在HDFS上移动了HFile文件,然后通知HMaster将该Regionserver的一个或多个region上线。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Python eggs异常

    最近在使用Python的时候发现有异常,异常内容为python-eggs is writable ……,详细异常如下:

    幽鸿
  • 2.2 搜索就是所有

    刚刚情人节过去,也想来一篇数据分析的实例。别黑我,别恶心我。从网上找了一份2000W的数据,粗略处理后,导入MySql,试着看看MySql这2000W数据的效果...

    幽鸿
  • Python源码安装MySQLdb

    1、#    yum install Python-devel MySQL-devel zlib-devel openssl-devel

    幽鸿
  • 用MapReduce分析Hbase将结果插入mysql中

    从HBASE读取清洗过的数据,写入到mysql的表中 NewInstallUserRunner.java 计算新增用户入口类 ? NewInstallUser...

    Albert陈凯
  • Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

      前面一篇博文写的是Combiner优化MapReduce执行,也就是使用Combiner在map端执行减少reduce端的计算量。

    大道七哥
  • Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    前言   前面一篇博文写的是Combiner优化MapReduce执行,也就是使用Combiner在map端执行减少reduce端的计算量。 一、作业的默认配置...

    用户1195962
  • SharePoint CAML In Action——Part II

    在SharePoint中,相对于Linq to SharePoint而言,CAML是轻量化的。当然缺点也是显而易见的,"Hard Code"有时会让你抓狂。在实...

    用户1161731
  • 【IoT迷你赛】走廊照明助手

    首先非常感谢腾讯云IoT团队给到的体验资格,没想到毕业以后,我还以有机会玩单片机。最近刚搬家不久,刚好新的出租屋有一个辅助照明的需求,而我拿到了E53_SC1模...

    jegbrother
  • SAP Cloud for Customer的inscreen_dataflow处理方式

    从上图可以看出这个navigation是从Lead TI页面screen内部,通过PublicOutportECLeadProduct到达Embedded Co...

    Jerry Wang
  • 2019-05-15 7个对初学者非常有用调试和故障排除技巧

    https://www.javacodegeeks.com/2019/05/useful-debugging-troubleshooting-tips.html...

    Albert陈凯

扫码关注云+社区

领取腾讯云代金券