前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据技术之_11_HBase学习_02_HBase API 操作 + HBase 与 Hive 集成 + HBase 优化

大数据技术之_11_HBase学习_02_HBase API 操作 + HBase 与 Hive 集成 + HBase 优化

作者头像
黑泽君
发布2019-03-15 11:37:19
1.8K0
发布2019-03-15 11:37:19
举报
文章被收录于专栏:黑泽君的专栏黑泽君的专栏

第6章 HBase API 操作

6.1 环境准备

新建项目后在pom.xml中添加依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.1</version>
</dependency>

<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>1.8</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

6.2 HBase API

6.2.1 判断表是否存在

旧API

代码语言:javascript
复制
    // 判断表是否存在
    public static boolean isTableExistOldAPI(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {

        // HBase配置文件
        @SuppressWarnings("deprecation")
        HBaseConfiguration conf = new HBaseConfiguration();

        // 设置zookeeper地址
        conf.set("hbase.zookeeper.quorum", "192.168.25.102");
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        // 获取HBase管理员对象
        @SuppressWarnings("deprecation")
        HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
        boolean tableExists = hBaseAdmin.tableExists(tableName);

        // 关闭
        hBaseAdmin.close();

        return tableExists;
    }

新API

代码语言:javascript
复制
    // 判断表是否存在-新API
    public static boolean isTableExistNewAPI(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {

        // HBase配置文件
        Configuration conf = HBaseConfiguration.create();

        // 设置zookeeper地址
        conf.set("hbase.zookeeper.quorum", "192.168.25.102");
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        // 获取连接对象,执行
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        boolean tableExists = admin.tableExists(TableName.valueOf(tableName));

        // 关闭资源
        admin.close();

        return tableExists;
    }
6.2.2 抽取获取 Configuration、Connection、Admin 对象的方法以及关闭资源的方法
代码语言:javascript
复制
    static Admin admin = null;

    static Connection conn = null;

    static Configuration conf = null;

    static {
        // HBase配置文件
        conf = HBaseConfiguration.create();

        // 设置zookeeper地址
        conf.set("hbase.zookeeper.quorum", "192.168.25.102");
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        try {
            // 获取连接对象,执行
            conn = ConnectionFactory.createConnection(conf);
            admin = conn.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void close(Connection conn, Admin admin) {
        if (conn != null) {
            try {
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

注意:以下演示均使用新API!

6.2.3 创建表(admin)
代码语言:javascript
复制
    // 创建表
    public static void createTable(String tableName, String... columnFamily) throws IOException {

        if (isTableExistNewAPI(tableName)) {
            System.out.println("表" + tableName + "已存在!");
            return;
        }

        // 创建表描述器
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

        // 添加列族
        for (String cf : columnFamily) {
            // 创建列描述器
            HColumnDescriptor HColumnDescriptor = new HColumnDescriptor(cf);
            // 指定列族的版本个数,默认个数是一个
            // HColumnDescriptor.setMaxVersions(5);
            hTableDescriptor.addFamily(HColumnDescriptor);
        }

        // 创建表操作
        admin.createTable(hTableDescriptor);
        System.out.println("表" + tableName + "创建成功!");
    }
6.2.4 删除表(admin)
代码语言:javascript
复制
    // 删除表
    public static void deleteTable(String tableName) throws IOException {

        if (isTableExistNewAPI(tableName)) {
            // 删除表之前先使表不可用(下线)
            admin.disableTable(TableName.valueOf(tableName));
            // 执行删除操作
            admin.deleteTable(TableName.valueOf(tableName));
            System.out.println("表" + tableName + "删除成功!");
        } else {
            System.out.println("表" + tableName + "不存在!");
        }
    }

注意:truncate,清空表数据,实际底层操作是先使表不可用(下线),然后删除表,最后根据表信息重新创建一张新表。

6.2.5 向表中插入数据(put)
代码语言:javascript
复制
    // 向表中插入数据(或修改)
    public static void putRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException {

        // 创建HTable对象
        // 旧API
        // HTable hTable = new HTable(conf, TableName.valueOf(tableName));
        // 获取Table对象
        // 新API
        Table table = conn.getTable(TableName.valueOf(tableName));

        Put put = new Put(Bytes.toBytes(rowKey));
        // 向Put对象中组装数据
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));

        // 执行向表中插入数据的操作
        table.put(put);

        System.out.println("插入数据成功");

        table.close();

        // 批量插入数据提示:1、同一个RowKey下添加不同的列;2、不同的RowKey,可以将RowKey(Put)放到List集合。
    }
6.2.6 删除多行数据(delete)
代码语言:javascript
复制
    // 删除多行数据
    public static void deleteData(String tableName, String rowKey, String columnFamily, String column) throws IOException {

        // 获取Table对象
        // 新API
        Table table = conn.getTable(TableName.valueOf(tableName));

        // 创建Delete对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));

        // 向Delete对象中组装数据,如果不组装,则删除的是行键的数据(多行数据)
        // delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); // 慎用这个方法,删除某个版本(默认最新版本),保留旧的版本
        // delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); // 公司推荐使用这个方法,删除所有版本

        // 执行删除操作
        table.delete(delete);

        System.out.println("删除多行数据成功");

        table.close();
    }
6.2.7 获取所有数据(scan)
代码语言:javascript
复制
    // 获取所有数据(全表扫描)
    public static void scanTable(String tableName) throws IOException {

        // 获取Table对象
        // 新API
        Table table = conn.getTable(TableName.valueOf(tableName));

        // 构建扫描器,指定扫描的起始行和结束行,不指定的话,表示扫描全表,还可以指定其他限定
        Scan scan = new Scan();
        // scan.setStartRow(startRow);
        // scan.setStopRow(stopRow);

        // 执行扫描全表操作
        ResultScanner resultScanner = table.getScanner(scan);

        for (Result result : resultScanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.println("行键:" + Bytes.toString(result.getRow())
                + " 列族:" + Bytes.toString(CellUtil.cloneFamily(cell))
                + " 列:" + Bytes.toString(CellUtil.cloneQualifier(cell))
                + " 值:" + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }

        table.close();
    }

表结构图解:

6.2.8 获取某一行数据(get)
代码语言:javascript
复制
    // 获取某一行数据
    public static void getRowData(String tableName, String rowKey) throws IOException {

        // 获取Table对象
        // 新API
        Table table = conn.getTable(TableName.valueOf(tableName));

        // 新建一个Get对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 显示所有版本
        // get.setMaxVersions();
        // 显示指定版本
        // get.setMaxVersions(maxVersions);
        // 显示指定时间戳的版本
        // get.setTimeStamp();

        // 执行获取某一行数据的操作
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键:" + Bytes.toString(result.getRow())
            + " 列族:" + Bytes.toString(CellUtil.cloneFamily(cell))
            + " 列:" + Bytes.toString(CellUtil.cloneQualifier(cell))
            + " 值:" + Bytes.toString(CellUtil.cloneValue(cell))
            + " 时间戳:" + cell.getTimestamp());
        }

        table.close();
    }
6.2.9 获取某一行指定“列族:列”的数据(get)
代码语言:javascript
复制
    // 获取某一行指定“列族:列”的数据
    public static void getRowQualifierData(String tableName, String rowKey, String columnFamily, String column) throws IOException {

        // 获取Table对象
        // 新API
        Table table = conn.getTable(TableName.valueOf(tableName));

        // 新建一个Get对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 指定要获取某一行的“列族:列”
        get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));

        // 执行获取某一行指定“列族:列”数据的操作
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键:" + Bytes.toString(result.getRow())
            + " 列族:" + Bytes.toString(CellUtil.cloneFamily(cell))
            + " 列:" + Bytes.toString(CellUtil.cloneQualifier(cell))
            + " 值:" + Bytes.toString(CellUtil.cloneValue(cell))
            + " 时间戳:" + cell.getTimestamp());
        }

        table.close();
    }

主函数代码:

代码语言:javascript
复制
    public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {

        // 判断表是否存在-旧API
        // System.out.println(isTableExistOldAPI("student"));
        // System.out.println(isTableExistOldAPI("staff"));

        // 判断表是否存在-新API
        // System.out.println(isTableExistNewAPI("student"));
        // System.out.println(isTableExistNewAPI("staff"));

        // 创建表
        // createTable("student", "info");
        // createTable("staff", "info");
        // createTable("haha", "info","info2","info3");

        // 删除表
        // deleteTable("haha");

        // 向表中插入数据(或修改)
        // putRowData("student", "1003", "info", "name", "hahahaha");

        // 删除多行数据
        // deleteData("student", "1001", "info", "name");

        // 获取所有数据(全表扫描)
        // scanTable("student");

        // 获取某一行数据
        // getRowData("student", "1002");

        // 获取某一行指定“列族:列”的数据
        getRowQualifierData("student", "1002", "info", "age");

        // 关闭资源
        close(conn, admin);
    }

6.3 MapReduce

  通过 HBase 的相关 JavaAPI,我们可以实现伴随 HBase 操作的 MapReduce 过程,比如使用 MapReduce 将数据从本地文件系统导入到 HBase 的表中,比如我们从 HBase 中读取一些原始数据后使用 MapReduce 做数据分析。

6.3.1 官方 HBase-MapReduce

0.使用 MapReduce 作为 HBase 的分析框架,首先我们需要 Hadoop 持有 HBase 的jar包。 1.查看 HBase 的 MapReduce 任务的执行

代码语言:javascript
复制
$ bin/hbase mapredcp

2.环境变量的导入 (1)执行环境变量的导入(临时生效,在命令行执行下述操作)

代码语言:javascript
复制
$ export HBASE_HOME=/opt/module/hbase
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

(2)永久生效:在/etc/profile配置

代码语言:javascript
复制
#HBASE_HOME
export HBASE_HOME=/opt/module/hbase

使/etc/profile文件生效

代码语言:javascript
复制
$ source /etc/profile

并在 hadoop-env.sh 中配置:(注意:在for循环之后配,即使用追加的方式)

代码语言:javascript
复制
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

配置分发

代码语言:javascript
复制
[atguigu@hadoop102 hadoop-2.7.2]$ xsync etc/hadoop/hadoop-env.sh 

3.运行官方的 MapReduce 任务 案例一:统计 Student 表中有多少行数据

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar ./lib/hbase-server-1.3.1.jar rowcounter student

案例二:使用 MapReduce 将本地数据导入到 HBase 1)在本地(HDFS)创建一个 tsv 格式的文件:fruit.tsv

代码语言:javascript
复制
[atguigu@hadoop102 datas]$ pwd
/opt/module/datas
[atguigu@hadoop102 datas]$ vim fruit.tsv

1001    Apple   Red
1002    Pear    Yellow
1003    Pineapple   Yellow

2)在 HBase 上创建 fruit 表

代码语言:javascript
复制
hbase(main):001:0> create 'fruit','info'

3)在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件

代码语言:javascript
复制
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put /opt/module/datas/fruit.tsv /input_fruit/

4)执行 MapReduce 到 HBase 的 fruit 表中

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar ./lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit

5)使用 scan 命令查看 HBase 导入后的结果

代码语言:javascript
复制
hbase(main):001:0> scan 'fruit'
6.3.2 自定义 HBase-MapReduce1

目标:将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。 即:从 HBase 读数据,通过 MR,最终写入 HBase。 分步实现: 1.构建 ReadFruitFromHBaseMapper 类,用于读取 fruit 表中的数据

代码语言:javascript
复制
package com.atguigu.mr1;

import java.io.IOException;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

public class ReadFruitFromHBaseMapper extends TableMapper<ImmutableBytesWritable, Put> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        // 将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
        Cell[] cells = value.rawCells();

        // 创建Put对象
        Put put = new Put(key.get());

        for (Cell cell : cells) {
            // 添加/克隆列族:info
            if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                // 添加/克隆列:name
                if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                    // 将该列cell加入到Put对象中
                    put.add(cell);
                    // 添加/克隆列:color
                } else if ("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                    // 向该列cell加入到Put对象中
                    put.add(cell);
                }
            }
        }

        // 将从fruit读取到的每行数据写入到context中作为map的输出
        context.write(key, put);
    }
}

2.构建 WriteFruitMRReducer 类,用于将读取到的 fruit 表中的数据写入到 fruit_mr 表中

代码语言:javascript
复制
package com.atguigu.mr1;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
            throws IOException, InterruptedException {
        // 读出来的每一行数据写入到fruit_mr表中
        for (Put value : values) {
            context.write(NullWritable.get(), value);
        }
    }
}

3.构建 Fruit2FruitMRRunner extends Configuration implements Tool 用于组装运行 Job 任务

代码语言:javascript
复制
package com.atguigu.mr1;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Fruit2FruitMRRunner extends Configuration implements Tool {

    private Configuration conf = null;

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    public int run(String[] args) throws Exception {

        // 创建Job任务
        Job job = Job.getInstance(this.getConf());

        // 指定Runner类
        job.setJarByClass(Fruit2FruitMRRunner.class);

        // 配置Job
        Scan scan = new Scan();
        // scan.setCacheBlocks(false);
        // scan.setCaching(500);

        // 指定Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
        TableMapReduceUtil.initTableMapperJob(
                "fruit",                        // 数据源的表名
                scan ,                          // scan扫描控制器
                ReadFruitMapper.class,          // 设置Mapper类
                ImmutableBytesWritable.class,   // 设置Mapper输出key类型
                Put.class,                      // 设置Mapper输出value值类型
                job                             // 设置给哪个JOB
        );

        // 指定Reducer
        TableMapReduceUtil.initTableReducerJob(
                "fruit_mr", 
                WriteFruitMRReducer.class, 
                job);

        // 设置Reduce数量,最少1个
        job.setNumReduceTasks(1);

        // 提交
        boolean isSuccess = job.waitForCompletion(true);
        if (!isSuccess) {
            throw new IOException("Job running with error");
        }
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
        System.exit(status);
    }

}

注意:这是针对 HBase 官方建议的组装 Job 任务的方式,为什么官方建议这样做呢?   答:是因为我们在运行 MapReduce 任务的时候要添加一些额外的参数,而参数要能被解析,格式就要这么去写。 小结:以后的 Driver 建议这么去写!!!

4.主函数中调用运行该 Job 任务

代码语言:javascript
复制
    public static void main(String[] args) throws Exception {

        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
        System.exit(status);
    }

5.打包运行任务

代码语言:javascript
复制
$ /opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/datas/HBase.jar com.atguigu.mr1.Fruit2FruitMRRunner

提示:运行任务前,如果待导入的数据的表不存在,则需要提前创建。所以我们先在 HBase 上创建 fruit_mr 表。 提示:maven 打包命令:-P local clean package 或 -P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

6.3.3 自定义 HBase-MapReduce2

目标:实现将 HDFS 中的数据写入到 HBase 表中。 分步实现: 1.构建 ReadFruitFromHDFSMapper 于读取 HDFS 中的文件数据

代码语言:javascript
复制
package com.atguigu.mr2;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 获取一行数据(从HDFS中读取的数据)
        String line = value.toString();

        // 切割(读取出来的每行数据使用\t进行分割)
        String[] fields = line.split("\t");

        //根据数据中值的含义取值
        String rowKey = fields[0];
        String name = fields[1];
        String color = fields[2];

        // 封装Put对象
        Put put = new Put(Bytes.toBytes(rowKey));

        // 参数分别:列族、列、值  
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),  Bytes.toBytes(name)); 
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"),  Bytes.toBytes(color)); 

        // 写出
        context.write(NullWritable.get(), put);
    }

}

2.构建 WriteFruitMRFromTxtReducer 类

代码语言:javascript
复制
package com.atguigu.mr2;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
            throws IOException, InterruptedException {
        // 读出来的每一行数据写入到fruit_hdfs表中
        for (Put value : values) {
            context.write(NullWritable.get(), value);
        }
    }
}

3.创建 Txt2FruitRunner 组装 Job

代码语言:javascript
复制
package com.atguigu.mr2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Txt2FruitRunner extends Configuration implements Tool {

    private Configuration conf = null;

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    public int run(String[] args) throws Exception {

        // 创建Job任务
        Job job = Job.getInstance(this.getConf());

        // 指定Runner类
        job.setJarByClass(Txt2FruitRunner.class);

        // 设置Mapper
        job.setMapperClass(ReadFruitFromHDFSMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);

        // 设置Reducer
        TableMapReduceUtil.initTableReducerJob(
                "fruit_hdfs", 
                WriteFruitMRFromTxtReducer.class, 
                job);

        // 设置Reduce数量,最少1个
        job.setNumReduceTasks(1);

        // 设置输入文件路径
        Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv");
        FileInputFormat.addInputPath(job, inPath);

        boolean isSuccess = job.waitForCompletion(true);
        if (!isSuccess) {
            throw new IOException("Job running with error");
        }
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
        System.exit(status);
    }
}

4.调用执行 Job

代码语言:javascript
复制
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
        System.exit(status);
    }

5.打包运行

代码语言:javascript
复制
$ /opt/module/hadoop-2.7.2/bin/yarn jar HBase2.jar com.atguigu.mr2.Txt2FruitRunner

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。所以我们先在 HBase 上创建 fruit_hdfs 表。 提示:maven打包命令:-P local clean package 或 -P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)

6.4 与 Hive 的集成

6.4.1 HBase 与 Hive 的对比

1.Hive (1) 数据仓库   Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询。 (2) 用于数据分析、清洗(ETL工程师:把业务数据准备好)   Hive 适用于离线的数据分析和清洗,延迟较高。 (3) 基于 HDFS、MapReduce   Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行。

2.HBase (1) 数据库   是一种面向列存储的非关系型数据库。 (2) 用于存储结构化和非结构化的数据   适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作。 (3) 基于 HDFS   数据持久化存储的体现形式是 Hfile,存放于 DataNode 中,被 ResionServer 以 Region 的形式进行管理。 (4) 延迟较低,接入在线业务使用   面对大量的企业数据,HBase 可以实现单表大量数据的存储,同时提供了高效的数据访问速度。

6.4.2 HBase 与 Hive 集成使用

尖叫提示:HBase 与 Hive 的集成在最新的两个原生版本中无法兼容。错误截图如下:

我们只能对 hive-hbase-handler-1.2.1.jar 做适配。 所以,我们只能含着泪勇敢的重新编译:hive-hbase-handler-1.2.1.jar 好气啊!!! 编译步骤如下: (1) 准备好 hive 的源码 (2) 将 /opt/module/hive/lib目录下 和 /opt/module/hbase/lib目录下 的jar包合并在一起,删除掉里面的pom文件和文件夹(ruby、PHP、python) (3) 建议使用 eclipse 编译jar包 (4) 新建一个普通的 java 工程 (5) 导入 hive 的源码

选择【File System】

选择要编译的jar包,没有必要选择要编译的全部jar包

手动导入依赖,先新建一个lib文件夹

复制粘贴准备好的jar包,放在lib文件夹下,然后添加至构建路径,之后删除掉低相同的版本的依赖 然后进行导出操作

选择 JAR file

去掉勾选lib,选择文件路径,点击完成即可

将新编译的 hive-hbase-handler-1.2.1.jar 拷贝至 /opt/module/hive/lib 目录下,覆盖掉以前的jar。

环境准备 因为我们后续可能会在操作 Hive 的同时对 HBase 也会产生影响,所以 Hive 需要持有操作 HBase 的 Jar,那么接下来拷贝 Hive 所依赖的 Jar包(或者使用软连接的形式)。 即:Hive 如何能获取到 HBase 中的数据。 修改/etc/profile文件,增加以下内容:

代码语言:javascript
复制
#HBASE_HOME
export HBASE_HOME=/opt/module/hbase

#HIVE_HOME
export HIVE_HOME=/opt/module/hive

使/etc/profile文件生效

代码语言:javascript
复制
$ source /etc/profile

使用软连接

代码语言:javascript
复制
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar

同时在 hive-site.xml 中修改 zookeeper 的属性,如下:

代码语言:javascript
复制
[atguigu@hadoop102 conf]$ pwd
/opt/module/hive/conf
[atguigu@hadoop102 conf]$ vim hive-site.xml 

增加内容如下:

代码语言:javascript
复制
<property>
    <name>hive.zookeeper.quorum</name>
    <value>hadoop102,hadoop103,hadoop104</value>
    <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
    <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>

1.案例一 目标:建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表。 分步实现: (1) 在 Hive 中创建表同时关联 HBase

代码语言:javascript
复制
CREATE TABLE hive_hbase_emp_table(
  empno int,
  ename string,
  job string,
  mgr int,
  hiredate string,
  sal double,
  comm double,
  deptno int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name"="hbase_emp_table");

提示:完成之后,可以分别进入 Hive 和 HBase 查看,都生成了对应的表。

(2) 在 Hive 中创建临时中间表,用于 load 文件中的数据 提示:不能将数据直接 load 进 Hive 所关联 HBase 的那张表中。

代码语言:javascript
复制
create table emp(
  empno int,
  ename string,
  job string,
  mgr int,
  hiredate string,
  sal double,
  comm double,
  deptno int
)
row format delimited fields terminated by '\t';

(3) 向 Hive 中间表中 load 数据

代码语言:javascript
复制
hive> load data local inpath '/opt/module/datas/emp.txt' into table emp;

(4) 通过 insert 命令将中间表中的数据导入到 Hive 关联 HBase 的那张表中

代码语言:javascript
复制
hive> insert into table hive_hbase_emp_table select * from emp;

(5) 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据 Hive:

代码语言:javascript
复制
hive> select * from hive_hbase_emp_table;

HBase:

代码语言:javascript
复制
hbase> scan ‘hbase_emp_table’

2.案例二 目标:在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来关联 HBase 中的 hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数据。 :该案例2紧跟案例1的脚步,所以完成此案例前,请先完成案例1。 分步实现: (1) 在Hive中创建外部表

代码语言:javascript
复制
CREATE EXTERNAL TABLE relevance_hbase_emp(
  empno int,
  ename string,
  job string,
  mgr int,
  hiredate string,
  sal double,
  comm double,
  deptno int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
TBLPROPERTIES ("hbase.table.name"="hbase_emp_table");

(2) 关联后就可以使用 Hive 函数进行一些分析操作了

代码语言:javascript
复制
hive (default)> select * from relevance_hbase_emp;

第7章 HBase 优化

7.1 高可用

  在 HBase 中 Hmaster 负责监控 RegionServer 的生命周期,均衡 RegionServer 的负载,如果 Hmaster 挂掉了,那么整个 HBase 集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以 HBase 支持对 Hmaster 的高可用配置。 1.关闭 HBase 集群(如果没有开启则跳过此步)

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh\

2.在 conf 目录下创建 backup-masters 文件

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ touch conf/backup-masters

3.在 backup-masters 文件中配置高可用 HMaster 节点

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters

4.将整个 conf 目录 scp 到其他节点 或者 配置分发

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/module/hbase/
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/module/hbase/

或者

[atguigu@hadoop102 hbase]$ xsync conf/backup-masters

5.启动 Hbase

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ bin/start-hbase.sh\

6.打开页面测试查看   http://hadooo102:16010

  http://hadoop103:16010/master-status

7.我们杀死主节点 hadoop102

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ kill -9 7886

8.再次打开页面测试查看   http://hadoop103:16010/master-status hadoop103 节点变为主节点了

9.我们再单次启动 hadoop102 作为主节点

代码语言:javascript
复制
[atguigu@hadoop102 hbase]$ bin/hbase-daemon.sh start master

打开页面测试查看   http://hadoop103:16010/master-status

7.2 预分区

自动分区 自动分区会导致数据倾斜。

  每一个 region 维护着 startRow 与 endRowKey,如果加入的数据符合某个 region 维护的 rowKey 范围,则该数据交给这个 region 维护。那么依照这个原则(即预估数据量),我们可以将数据所要投放的分区提前大致的规划好,以提高 HBase 性能。 1.手动设定预分区

代码语言:javascript
复制
hbase> create 'staff1','info','partition1',SPLITS => ['1000','2000','3000','4000']

2.生成16进制序列预分区

代码语言:javascript
复制
hbase> create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

3.按照文件中设置的规则预分区(HBase底层会自动排序) 创建splits.txt文件内容如下:

代码语言:javascript
复制
aaaa
bbbb
cccc
dddd

然后执行:

代码语言:javascript
复制
hbase> create 'staff3','partition3',SPLITS_FILE => '/opt/module/datas/splits.txt'

4.使用 JavaAPI 创建预分区(旧API)

代码语言:javascript
复制
    // 自定义算法,产生一系列Hash散列值存储在二维数组中
    byte[][] splitKeys = 某个散列值函数;
    // 创建HBaseAdmin实例
    HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());
    // 创建HTableDescriptor实例
    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
    // 通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表
    hAdmin.createTable(tableDesc, splitKeys);

工作经验小结:针对于一张表,一台服务器有2到3个Region。

7.3 RowKey 设计

  一条数据的唯一标识就是 rowkey,那么这条数据存储于哪个分区,取决于 rowkey 处于哪个一个预分区的区间内,设计 rowkey 的主要目的 ,就是让数据均匀的分布于所有的 region 中,在一定程度上防止数据倾斜。接下来我们就谈一谈 rowkey 常用的设计方案。 原则:唯一性、长度、散列。 心法:数据散列不能太散列,数据集中不能太集中。结合业务(站在什么角度),进行取舍。 常用预分区键(拦截数据):00_|xxxx_xxxx_xxxx、01_|xxxx_xxxx_xxxx、02_|xxxx_xxxx_xxxx   计算 RowKey 是跟 RowKey 对应的数据相关的,对应的数据中我们抽取时间戳、最常用的列、最特殊的列等等拿出来进行hash或者MD5,然后与预分区数进行取模,即散列。

1.生成随机数、hash、散列值

代码语言:javascript
复制
比如:
    原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7
    原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd
    原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913

在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的 rowKey 来 Hash 后作为每个分区的临界值。

2.字符串反转

代码语言:javascript
复制
20170524000001转成10000042507102
20170524000002转成20000042507102

这样也可以在一定程度上散列逐步put进来的数据。

3.字符串拼接

代码语言:javascript
复制
20170524000001_a12e
20170524000001_93i7

7.4 内存优化

  HBase 操作过程中需要大量的内存开销,毕竟 Table 是可以缓存在内存中的,一般会分配整个可用内存的 70% 给 HBase 的 Java 堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致 RegionServer 处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。

7.5 基础优化

1.允许在 HDFS 的文件中追加内容 hdfs-site.xml、hbase-site.xml

代码语言:javascript
复制
属性:dfs.support.append
解释:开启 HDFS 追加同步,可以优秀的配合 HBase 的数据同步和持久化。默认值为true。

2.优化 DataNode 允许的最大文件打开数 hdfs-site.xml

代码语言:javascript
复制
属性:dfs.datanode.max.transfer.threads
解释:HBase 一般都会同一时间操作大量的文件(刷写、合并),根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096。

3.优化延迟高的数据操作的等待时间 hdfs-site.xml

代码语言:javascript
复制
属性:dfs.image.transfer.timeout
解释:如果对于某一次数据操作来讲,延迟非常高,socket 需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保 socket 不会被 timeout 掉。

4.优化数据的写入效率 mapred-site.xml

代码语言:javascript
复制
属性:
    mapreduce.map.output.compress
    mapreduce.map.output.compress.codec
解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式。

5.设置 RPC 监听数量 hbase-site.xml

代码语言:javascript
复制
属性:hbase.regionserver.handler.count
解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。

6.优化 HStore 文件大小 hbase-site.xml

代码语言:javascript
复制
属性:hbase.hregion.max.filesize
解释:默认值10737418240(10GB),如果需要运行 HBase 的 MR 任务,可以减小此值,因为一个 region 对应一个 map 任务,如果单个 region 过大,会导致 map 任务执行时间过长。
该值的意思就是,如果 HFile 的大小达到这个数值,则这个 region 会被切分为两个 Hfile。

7.优化 hbase 客户端缓存 hbase-site.xml

代码语言:javascript
复制
属性:hbase.client.write.buffer
解释:用于指定 HBase 客户端缓存(即 BlockCache 大小),增大该值可以减少 RPC 调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少 RPC 次数的目的。

8.指定 scan.next 扫描 HBase 所获取的行数 hbase-site.xml

代码语言:javascript
复制
属性:hbase.client.scanner.caching
解释:用于指定 scan.next 方法获取的默认行数,值越大,消耗内存越大。

9.flush、compact、split机制

  当 MemStore 达到阈值,将 Memstore 中的数据 Flush 进 Storefile;   compact 机制则是把 flush 出来的小文件合并成大的 Storefile 文件;   split 则是当 Region 达到阈值,会把过大的 Region 一分为二。

涉及属性:

代码语言:javascript
复制
hbase.hregion.memstore.flush.size:134217728

即:128M就是 Memstore 的默认阈值,这个参数的作用是当单个 HRegion 内所有的 Memstore 大小总和超过指定值时,flush 该 HRegion 的所有 memstore。RegionServer 的 flush 是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。

代码语言:javascript
复制
hbase.regionserver.global.memstore.upperLimit:0.4
hbase.regionserver.global.memstore.lowerLimit:0.38

即:当 MemStore 使用内存总量达到 hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个 MemStores flush 到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到 MemStore 使用内存略小于 lowerLimit。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-03-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 6.1 环境准备
  • 6.2 HBase API
    • 6.2.1 判断表是否存在
      • 6.2.2 抽取获取 Configuration、Connection、Admin 对象的方法以及关闭资源的方法
        • 6.2.3 创建表(admin)
          • 6.2.4 删除表(admin)
            • 6.2.5 向表中插入数据(put)
              • 6.2.6 删除多行数据(delete)
                • 6.2.7 获取所有数据(scan)
                  • 6.2.8 获取某一行数据(get)
                    • 6.2.9 获取某一行指定“列族:列”的数据(get)
                    • 6.3 MapReduce
                      • 6.3.1 官方 HBase-MapReduce
                        • 6.3.2 自定义 HBase-MapReduce1
                          • 6.3.3 自定义 HBase-MapReduce2
                          • 6.4 与 Hive 的集成
                            • 6.4.1 HBase 与 Hive 的对比
                              • 6.4.2 HBase 与 Hive 集成使用
                              • 第7章 HBase 优化
                                • 7.1 高可用
                                  • 7.2 预分区
                                    • 7.3 RowKey 设计
                                      • 7.4 内存优化
                                        • 7.5 基础优化
                                        相关产品与服务
                                        大数据
                                        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                                        领券
                                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档