前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Scala 操作 HBase2.0 数据库

Scala 操作 HBase2.0 数据库

原创
作者头像
ZHANGHAO
修改2019-03-06 10:46:45
3K1
修改2019-03-06 10:46:45
举报

环境配置

Maven添加hbase-client的依赖

  <!--HBase Client-->
    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.2</version>
        </dependency>
    </dependencies>

Scala操作HBase

创建HBase的配置、Connection、Admin

  /*
  *创建一个HBase的配置,创建的时候会去加载classpath下的hbase-default.xml和hbase-site.xml两个配置文件
  */
  private val conf = HBaseConfiguration.create()
  //设置Zookeeper的地址和端口来访问HBase,先从配置中读取,如配置中不存在,设置地址为localhost,端口为默认端口2181
  conf.set(HConstants.ZOOKEEPER_QUORUM, conf.get(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST))
  conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, conf.get(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT.toString))

  //创建操作HBase的入口connection
  private val conn: Connection = ConnectionFactory.createConnection(conf)
  //创建操作HBase表的入口Admin
  private val admin: Admin = conn.getAdmin

获取表

 /**
    * 获取表
    *
    * @param tableName 表名
    * @return HBase表
    */

  def getTable(tableName: String): Table = {
    val table = Try(conn.getTable(TableName.valueOf(tableName)))
    table.get.close()
    table match {
      case Success(v) => v;
      case Failure(e) => e.printStackTrace()
        null
    }
  }

创建表

/**
    * 创建表
    *
    * @param tableName 表名
    * @param cf        列族
    */
  def createTable(tableName: String, cf: String): Unit = {
    //创建表
    val tableDesc = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
    tableDesc.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("basic".getBytes).build())
    println(s"Creating table `$tableName`. ")
    Try {
      if (admin.tableExists(TableName.valueOf(tableName))) {
        admin.disableTable(TableName.valueOf(tableName))
        admin.deleteTable(TableName.valueOf(tableName))
      }
      admin.createTable(tableDesc.build())
      admin.close()
      println("Done!")
    } match {
      case Success(_) =>
      case Failure(e) => e.printStackTrace()
    }

  }

删除表

/**
    * 删除表
    *
    * @param tableName 表名
    * @param rowKey    行键
    */
  def delete(tableName: String, rowKey: String): Unit = {
    val table = conn.getTable(TableName.valueOf(tableName))
    Try {
      val d = new Delete(rowKey.getBytes)
      table.delete(d)
      table.close()
    } match {
      case Success(_) =>
      case Failure(e) => e.printStackTrace()
    }

  }

往表中存放数据

/**
    *
    * 往表中存放数据
    *
    * @param tableName 表名
    * @param rowKey    行键
    * @param cf        列族
    * @param qualifier 列限定符
    * @param value     具体的值
    */
  def put(tableName: String, rowKey: String, cf: String, qualifier: String, value: String): Unit = {
    println(s"Put row key $rowKey into $tableName. ")
    val table = conn.getTable(TableName.valueOf(tableName))
    Try {
      //准备一个row key
      val p = new Put(rowKey.getBytes)
      //为put操作指定 column qualifier 和 value
      p.addColumn(cf.getBytes, qualifier.getBytes, value.getBytes)
      //放数据到表中
      table.put(p)
      table.close()
    } match {
      case Success(_) => println("Done!")
      case Failure(e) => e.printStackTrace()
    }
  }

获得表中的数据

 /**
    * 获得表里面的数据
    *
    * @param tableName 表名
    * @param rowKey    行键
    * @param cf        列族
    * @param qualifier 列限定符
    * @return 获得的数据
    */
  def get(tableName: String, rowKey: String, cf: String, qualifier: String): String = {
    val table = conn.getTable(TableName.valueOf(tableName))
    Try {
      val g = new Get(rowKey.getBytes)
      val result = table.get(g)
      table.close()
      Bytes.toString(result.getValue(cf.getBytes(), qualifier.getBytes()))
    } match {
      case Success(v) => v
      case Failure(e) => e.printStackTrace()
        null
    }

  }

扫描表中的数据

 /**
    * 扫描数据
    *
    * @param tableName 表名
    * @param cf        列族
    * @param qualifier 列限定符
    */
  def scan(tableName: String, cf: String, qualifier: String): Unit = {
    val table = conn.getTable(TableName.valueOf(tableName))
    val s = new Scan()
    s.addColumn(cf.getBytes, qualifier.getBytes)
    val scanner = table.getScanner(s)
    Try {
      val iterator = scanner.iterator()
      while (iterator.hasNext) {
        val next = iterator.next()
        println("Found row: " + next)
        println("Found value: " + Bytes.toString(
          next.getValue(cf.getBytes, qualifier.getBytes)))
      }
      scanner.close()
      table.close()
    } match {
      case Success(_) =>
      case Failure(e) => e.printStackTrace()
    }

  }

附录

完整的代码已经上传到gist。

file-hbaseutils-scala

参考文献:

Spark 下操作 HBase(1.0.0 新 API)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境配置
  • Scala操作HBase
    • 创建HBase的配置、Connection、Admin
      • 获取表
        • 创建表
          • 删除表
            • 往表中存放数据
              • 获得表中的数据
                • 扫描表中的数据
                • 附录
                相关产品与服务
                TDSQL MySQL 版
                TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档