HBase整合MapReduce之建立HBase索引

HBase索引主要用于提高Hbase中表数据的访问速度,有效的避免了全表扫描,HBase中的表根据行健被分成了多个Regions,通常一个region的一行都会包含较多的数据,如果以列值作为查询条件,就只能从第一行数据开始往下找,直到找到相关数据为止,这很低效。相反,如果将经常被查询的列作为行健、行健作为列重新构造一张表,即可实现根据列值快速定位相关数据所在的行,这就是索引。显然索引表仅需要包含一个列,所以索引表的大小和原表比起来要小得多,如图4-14给出了索引表与原表之间的关系。从图可以看出,由于索引表的单条记录所占的空间比原表要小,所以索引表的一个Region与原表相比,能包含更多条记录

假设HBase中存在一张表heroes,里面的内容如表所示,则根据列info:name构建的索引表如图4-15所示。Hbase会自动将生成的索引表加入如图4-3所示的结构中,从而提高搜索的效率

下面看代码实现

首先创建heroes表

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class CreateTable {

    public static void main(String[] args) {
        // 2.获得会话
        Admin admin = null;
        Connection con = null;

        try {
            // 操作hbase数据库
            // 1.建立连接
            Configuration conf = HBaseConfiguration.create(); // 获得配制文件对象
            conf.set("hbase.zookeeper.quorum", "192.168.52.140");
            con = ConnectionFactory.createConnection(conf); // 获得连接对象
            TableName tn = TableName.valueOf("heroes");

            admin = con.getAdmin();
            HTableDescriptor htd = new HTableDescriptor(tn);
            HColumnDescriptor hcd = new HColumnDescriptor("info");
            htd.addFamily(hcd);
            admin.createTable(htd);
            Table t = con.getTable(tn);
            String[] heronames = new String[] { "peter", "hiro", "sylar", "claire", "noah" };
            for (int i = 0; i < 5; i++) {
                Put put = new Put((i + "").getBytes());
                put.addColumn("info".getBytes(), "name".getBytes(), heronames[i].getBytes());
                put.addColumn("info".getBytes(), "email".getBytes(), (i + "@qq.com").getBytes());
                put.addColumn("info".getBytes(), "power".getBytes(), "Idotknow".getBytes());
                t.put(put);
            }
            admin.close();
            con.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

然后根据heroes表建立索引表

package com.tg.index;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;



public class CreateHbaseIndex {
    //map阶段,根据hbase中的数据取出行健和姓名
    public static class HbaseIndexMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable>{

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, ImmutableBytesWritable>.Context context)
                        throws IOException, InterruptedException {

            List<Cell> cs = value.listCells();

            for (Cell cell : cs) {
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                System.out.println("qualifier="+qualifier);
                if(qualifier.equals("name")){
                    //把名字做键  行健做值输出
                    context.write(new ImmutableBytesWritable(CellUtil.cloneValue(cell)), new ImmutableBytesWritable(CellUtil.cloneRow(cell)));
                }
            }

        }

    }
    //reduce阶段,将姓名作为键,行健作为值存入hbase
    public static class HbaseIndexReduce extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable>{

        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> value,
                Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, Mutation>.Context context)
                        throws IOException, InterruptedException {
            //把名字做行健
            Put put=new Put(key.get());
            //把行健做值
            for (ImmutableBytesWritable v : value) {
                put.addColumn("rowkey".getBytes(),"index".getBytes(),v.get() );
            }
            context.write(key, put);
        }

    }
    private static void checkTable(Configuration conf) throws Exception {
        Connection con = ConnectionFactory.createConnection(conf);
        Admin admin = con.getAdmin();
        TableName tn = TableName.valueOf("heroesIndex");
        if (!admin.tableExists(tn)){
            HTableDescriptor htd = new HTableDescriptor(tn);
            HColumnDescriptor hcd = new HColumnDescriptor("rowkey".getBytes());
            htd.addFamily(hcd);
            admin.createTable(htd);
            System.out.println("表不存在,新创建表成功....");
        }
    }

    public static void main(String[] args) {
        try {

            Configuration conf = new Configuration();
            conf = HBaseConfiguration.create(conf);
            conf.set("hbase.zookeeper.quorum", "192.168.52.140");

            Job job = Job.getInstance(conf, "heroes");
            job.setJarByClass(CreateHbaseIndex.class);
            Scan scan = new Scan();
            scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));

            TableMapReduceUtil.initTableMapperJob("heroes", scan, HbaseIndexMapper.class, 
                    ImmutableBytesWritable.class, ImmutableBytesWritable.class,job);
            TableMapReduceUtil.initTableReducerJob("heroesIndex", HbaseIndexReduce.class, job);

            checkTable(conf);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

heroes原表的记录如下图:

创建的索引表记录如下图:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏流媒体人生

ATL源码学习4---TearOff接口支持

    http://download.csdn.net/source/1687116

892
来自专栏码匠的流水账

docker运行storm及wordcount实例

本文简单介绍下怎么使用docker运行storm以及在springboot中使用storm。

2562
来自专栏爱撒谎的男孩

地址管理之省市区三级联动菜单

9323
来自专栏Bug生活2048

.net core下对于附件上传下载的实现

.net core通过IFormFile接收文件对象,再通过流的方式保存至指定的地方。

2003
来自专栏潇涧技术专栏

Lint Tool Analysis (2)

本系列的几篇源码分析文档意义不大,如果你正好也在研究lint源码,或者你想知道前面自定义lint规则中提出的那几个问题,抑或你只是想大致了解下lint的源码都有...

1641
来自专栏个人分享

使用SparkSQL实现多线程分页查询并写入文件

一、由于具有多张宽表且字段较多,每个宽表数据大概为4000万条,根据业务逻辑拼接别名,并每张宽表的固定字段进行left join 拼接SQL。这样就能根据每个宽...

2244
来自专栏码农分享

4.1、苏宁百万级商品爬取 代码讲解 索引建立

Lucene是一款高性能的、可扩展的信息检索(IR)工具库。信息检索是指文档搜索、文档内信息搜索或者文档相关的元数据搜索等操作。

1483
来自专栏小灰灰

Batik渲染png图片异常的bug修复

Batik渲染png图片异常的bug修复 batik是apache的一个开源项目,可以实现svg的渲染,后端借助它可以比较简单的实现图片渲染,当然和java一贯...

1989
来自专栏个人分享

初版storm项目全流程自动化测试代码实现

  由于项目需要,写了版针对业务的自动化测试代码,主要应用场景在于由于业务日趋复杂,一些公共代码的改动,担心会影响已有业务。还没进行重写,但知识点还是不少的与大...

891
来自专栏算法修养

Lucene.net(4.8.0) 学习问题记录二: 分词器Analyzer中的TokenStream和AttributeSource

前言:目前自己在做使用Lucene.net和PanGu分词实现全文检索的工作,不过自己是把别人做好的项目进行迁移。因为项目整体要迁移到ASP.NET Core ...

3527

扫码关注云+社区

领取腾讯云代金券