前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >HBase BulkLoad 原理及批量写入数据实战

HBase BulkLoad 原理及批量写入数据实战

作者头像
大数据真好玩
发布2022-12-05 09:11:30
1.2K0
发布2022-12-05 09:11:30
举报
文章被收录于专栏:暴走大数据暴走大数据

目录

一、HBase BulkLoad介绍

  1. 前言
  2. 为什么要用bulkload方式导入?
  3. bulkload的实现原理

二、HBase BulkLoad批量写入数据实战

  1. 开发生成HFile文件的代码
  2. 打成jar包提交到集群中运行
  3. 观察HDFS上输出的结果
  4. 加载HFile文件到hbase表中
  5. 总结

一、HBase BulkLoad介绍

1. 前言

之前我们介绍了HBASE的存储机制,HBASE存储数据其底层使用的是HDFS来作为存储介质,HBASE的每一张表对应的HDFS目录上的一个文件夹,文件夹名是以HBASE表的名字来命名(如果没有使用命名空间,那么默认是在default目录下)。在表文件夹下存放着若干个region命名的文件夹,而region文件夹中的每个列族也是用文件夹进行存储的,每个列族中存储的就是实际的数据,以HFile的形式存在。

路径格式: /hbase/data/default/<tbl_name>/<region_id>//<hfile_id>

2. 为什么要用bulkload方式导入?

在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等)。

3. bulkload的实现原理

按照HBase存储数据按照HFile格式存储在HDFS的原理,使用MapReduce直接生成HFile格式的数据文件,然后再通过RegionServer将HFile数据文件移动到相应的Region上去。

  • HBase数据正常写流程
  • bulkload方式的处理示意图
  • bulkload的好处
  1. 导入过程不占用Region资源
  2. 能快速导入海量的数据
  3. 节省内存

二、HBase BulkLoad批量写入数据实战

需求

通过bulkload的方式,将我们放在HDFS上面的这个路径/hbase/input/user.txt的数据文件,转换成HFile格式,然后load到myuser2这张Hbase表里面去。

1. 开发生成HFile文件的代码

自定义map类

package xsluo.hbase;
 
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
 
//四个泛型中后两个,分别对应rowkey及put
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
 
        //封装输出的rowkey类型
        ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(split[0].getBytes());
        //构建Put对象
        Put put = new Put(split[0].getBytes());
        put.addColumn("f1".getBytes(), "name".getBytes(), split[1].getBytes());
        put.addColumn("f1".getBytes(), "age".getBytes(), split[2].getBytes());
 
        context.write(immutableBytesWritable, put);
    }
}

程序main

package xsluo.hbase;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class HBaseBulkLoad extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        //设置ZK集群
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
 
        int run = ToolRunner.run(configuration, new HBaseBulkLoad(), args);
        System.exit(run);
    }
 
    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf);
        job.setJarByClass(HBaseBulkLoad.class);
 
        FileInputFormat.addInputPath(job, new Path("hdfs://node01:8020/hbase/input"));
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
 
        //获取数据库连接
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));
 
        //使MR可以向myuser2表中,增量增加数据
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
        //数据写回到HDFS,写成HFile -> 所以指定输出格式为HFileOutputFormat2
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node01:8020/hbase/out_hfile"));
 
        //开始执行
        boolean b = job.waitForCompletion(true);
 
        return b?0:1;
    }
}
2. 打成jar包提交到集群中运行
hadoop jar hbase_demo-1.0-SNAPSHOT.jar xsluo.hbase.HBaseBulkLoad
3. 观察HDFS上输出的结果
4. 加载HFile文件到hbase表中

方式一:代码加载

package xsluo.hbase;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 
public class LoadData {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
        //获取数据库连接
        Connection connection =  ConnectionFactory.createConnection(configuration);
        //获取表的管理器对象
        Admin admin = connection.getAdmin();
        //获取table对象
        TableName tableName = TableName.valueOf("myuser2");
        Table table = connection.getTable(tableName);
        //构建LoadIncrementalHFiles加载HFile文件
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
        load.doBulkLoad(new Path("hdfs://node01:8020/hbase/out_hfile"), admin,table,connection.getRegionLocator(tableName));
    }
}

方式二:命令加载

先将hbase的jar包添加到hadoop的classpath路径下

export HBASE_HOME=/xsluo/install/hbase-1.2.0-cdh5.14.2/
export HADOOP_HOME=/xsluo/install/hadoop-2.6.0-cdh5.14.2/
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
  • 运行命令
yarn jar /xsluo/install/hbase-1.2.0-cdh5.14.2/lib/hbase-server-1.2.0-cdh5.14.2.jar   completebulkload /hbase/out_hfile myuser2

加载完后,原文件会被移动到hdfs中此表对应的region的列族的目录下

5. 总结

本文为了演示实战效果,将生成HFile文件和使用BulkLoad方式导入HFile到HBase集群的步骤进行了分解,实际情况中,可以将这两个步骤合并为一个,实现自动化生成与HFile自动导入。如果在执行的过程中出现RpcRetryingCaller的异常,可以到对应RegionServer节点查看日志信息,这里面记录了出现这种异常的详细原因。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-10-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、HBase BulkLoad介绍
    • 1. 前言
      • 2. 为什么要用bulkload方式导入?
        • 3. bulkload的实现原理
        • 二、HBase BulkLoad批量写入数据实战
          • 1. 开发生成HFile文件的代码
            • 2. 打成jar包提交到集群中运行
              • 3. 观察HDFS上输出的结果
                • 4. 加载HFile文件到hbase表中
                  • 5. 总结
                  相关产品与服务
                  对象存储
                  对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档