前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >简简单单用一下 Hbase

简简单单用一下 Hbase

作者头像
WindWant
发布2022-05-10 09:26:40
5340
发布2022-05-10 09:26:40
举报
文章被收录于专栏:后端码事后端码事

一、Hbase 介绍

https://hbase.apache.org/book.html#_preface

https://blogs.apache.org/hbase/

https://research.google.com/archive/bigtable.html

什么是Hbase?

hadoop 数据库:分布式、可伸缩、大数据存储。

二、Hbase client

最开始引入 hbase-client,服务有使用【google/protobuf/wrappers.proto】,有很多包冲突,所以直接使用了 habase-shade-client: 

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-shaded-client</artifactId>
  <version>${hbase.shade.client.version}</version>
</dependency>

三、Hbase 配置

  • hbase.zookeeper.quorum zookeeper server 地址,逗号分割。本地模式和伪集群模式下,默认为 127.0.0.1
  • hbase.zookeeper.property.clientPort zookeeper server 端口,默认 2181
  • hbase.client.retries.number hbase client 所有操作的重试上限,默认 15。client 首先等待 hbase.client.pause 执行第一次重试,之后每隔 10s 再次执行。
  • hbase.rpc.timeout hbase client 一次 rpc 操作的超时时间(超时基于ping检查),默认60000ms,触发则抛出 TimeoutException 异常。
  • hbase.client.operation.timeout hbase client 一次操作的总的时间限制, 默认 1200000ms,触发则直接抛出 SocketTimeoutException 异常。
  • 示例: @Configuration public class HBaseConfig { @Value("${hbase.zookeeper.quorum}") private String hbaseZkQuorum; @Value("${hbase.zookeeper.property.clientPort:2181}") private String hbaseZkPort; @Value("${hbase.client.retries.number:2}") private String hbaseClientRetry; @Value("${hbase.rpc.timeout:2000}") private String hbaseRpcTimeout; @Value("${hbase.client.operation.timeout:3000}") private String hbaseClientOperationTimeout; @Bean public Connection hbaseConnection() throws IOException { org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create(); hbaseConfig.set("hbase.zookeeper.property.clientPort", hbaseZkPort); hbaseConfig.set("hbase.zookeeper.quorum", hbaseZkQuorum); hbaseConfig.set("hbase.client.retries.number", hbaseClientRetry); hbaseConfig.set("hbase.client.operation.timeout", hbaseClientOperationTimeout); hbaseConfig.set("hbase.rpc.timeout", hbaseRpcTimeout); return ConnectionFactory.createConnection(hbaseConfig); } @Bean public HbaseSimpleTemplate hbaseSimpleTemplate(@Qualifier("hbaseConnection") Connection hbaseConnection) { return new HbaseSimpleTemplate(hbaseConnection); } }

四、关于 Connection

1、Connection 是什么?

集群 connection 封装了底层和实际 hbase server 及 zookeeper 的连接。由 ConnectionFactory 创建并由发起端维护其整个生命周期。

承载了服务发现(hbase master 及 region server)及本地缓存维护(存储及更新)逻辑。所以基于此链接实例化而来的 Table 和 Admin 共享此信息。

2、Connection 怎么使用?

Connection 创建是一个很重的操作。

Connection 实现是 thread-safe 的。

所以通常的操作时,一次创建,到处使用。

这里我们通过 @Bean 注解,将 connection 实例交由 spring 管理,维护其从创建,使用到销毁的整个生命周期。

三、HbaseSimpleTemplate

Hbase Connection 数据操作封装:

row->column->all cells

row->column->cells

rows->column->cells

代码语言:javascript
复制
public class HbaseSimpleTemplate {
    private Connection hbaseConnection;
    public HbaseSimpleTemplate(Connection hbaseConnection) {
        this.hbaseConnection = hbaseConnection;
    }
    /**
     * 结果映射map
     *
     * @param result
     * @return
     */
    private Map<String, String> resultToMap(Result result) {
        if (result == null || result.isEmpty()) {
            return new HashMap<>();
        }
        return result.listCells().stream().collect(
                Collectors.toMap(cell -> Bytes.toString(CellUtil.cloneQualifier(cell)), cell -> Bytes.toString(CellUtil.cloneValue(cell))));
    }
    /**
     * 查询
     * @param tableName
     * @param rowName
     * @param familyName
     * @return
     * @throws IOException
     */
    public Map<String, String> get(String tableName, String rowName, String familyName) throws IOException {
        Map<String, Map<String, String>> resultMap = get(tableName, Collections.singletonList(rowName), familyName, null);
        return resultMap.values().stream().findFirst().orElse(new HashMap<>());
    }
    /**
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifiers
     * @return
     * @throws IOException
     */
    public Map<String, String> get(String tableName, String rowName, String familyName, List<String> qualifiers) throws IOException {
        Map<String, Map<String, String>> resultMap = get(tableName, Collections.singletonList(rowName), familyName, qualifiers);
        return resultMap.values().stream().findFirst().orElse(new HashMap<>());
    }
    /**
     * 批量查询
     *
     * @param tableName
     * @param rowNames
     * @param familyName
     * @return
     * @throws IOException
     */
    public Map<String, Map<String, String>> get(String tableName, List<String> rowNames, String familyName, List<String> qualifiers) throws IOException {
        Map<String, Map<String, String>> resultMap = new HashMap<>();
        List<Get> gets = new ArrayList<>();
        rowNames.forEach(rowName -> {
            Get get = new Get(rowName.getBytes());
            if (CollectionUtils.isNotEmpty(qualifiers)) {
                qualifiers.forEach(qualifier -> get.addColumn(familyName.getBytes(), qualifier.getBytes()));
            } else {
                get.addFamily(familyName.getBytes());
            }
            gets.add(get);
        });
        Arrays.stream(hbaseConnection.getTable(TableName.valueOf(tableName)).get(gets))
                .forEach(result -> {
                    Map<String, String> kvMap = resultToMap(result);
                    String id = MapUtils.getString(kvMap, "id");
                    if (StringUtils.isNotBlank(id)) {
                        resultMap.put(id, kvMap);
                    }
                });
        return resultMap;
    }
    /**
     * 写入 qualifier
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifier
     * @param value
     * @return
     * @throws IOException
     */
    public boolean put(String tableName, String rowName, String familyName, String qualifier, String value) throws IOException {
        Map<String, String> qv = new HashMap<>();
        qv.put(qualifier, value);
        put(tableName, rowName, familyName, qv);
        return true;
    }
    /**
     * 写入 qualifiers
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifierValues
     * @return
     * @throws IOException
     */
    public boolean put(String tableName, String rowName, String familyName, Map<String, String> qualifierValues) throws IOException {
        if (MapUtils.isEmpty(qualifierValues)) {
            return false;
        }
        List<Put> puts = new ArrayList<>();
        qualifierValues.forEach((qualifier, value) -> puts.add(new Put(rowName.getBytes()).addColumn(familyName.getBytes(), qualifier.getBytes(), value.getBytes())));
        hbaseConnection.getTable(TableName.valueOf(tableName)).put(puts);
        return true;
    }
    /**
     * 删除 
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @return
     * @throws IOException
     */
    public boolean del(String tableName, String rowName, String familyName) throws IOException {
        Delete delete = new Delete(rowName.getBytes());
        delete.addFamily(familyName.getBytes());
        hbaseConnection.getTable(TableName.valueOf(tableName)).delete(delete);
        return true;
    }
    /**
     * 删除 qualifier
     *
     * @param tableName
     * @param rowName
     * @param familyName
     * @param qualifiers
     * @return
     * @throws IOException
     */
    public boolean delQualifiers(String tableName, String rowName, String familyName, List<String> qualifiers) throws IOException {
        Delete delete = new Delete(rowName.getBytes());
        qualifiers.forEach(qualifier -> delete.addColumn(familyName.getBytes(), qualifier.getBytes()));
        hbaseConnection.getTable(TableName.valueOf(tableName)).delete(delete);
        return true;
    }
}

getTable:

获取 Table 实现用以访问表数据。

Table 非 thread-safe 的并且其创建很轻量,所以线程内使用需要单独创建(不需要且不应该缓存和池化)。 

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-04-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Hbase 介绍
  • 二、Hbase client
  • 三、Hbase 配置
  • 四、关于 Connection
    • 1、Connection 是什么?
      • 2、Connection 怎么使用?
      • 三、HbaseSimpleTemplate
      相关产品与服务
      TDSQL MySQL 版
      TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档