前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase

通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase

作者头像
大数据学习与分享
发布2020-08-10 14:45:43
2.3K0
发布2020-08-10 14:45:43
举报

在实际生产环境中,将计算和存储进行分离,是我们提高集群吞吐量、确保集群规模水平可扩展的主要方法之一,并且通过集群的扩容、性能的优化,确保在数据大幅增长时,存储不能称为系统的瓶颈。

具体到我们实际的项目需求中,有一个典型的场景,通常会将Hive中的部分数据,比如热数据,存入到HBase中,进行冷热分离处理。

我们采用Spark读取Hive表数据存入HBase中,这里主要有两种方式:

  1. 通过HBase的put API进行数据的批量写入
  2. 通过生成HFile文件,然后通过BulkLoad方式将数据存入HBase

HBase的原生put方式,通过HBase集群的region server向HBase插入数据,但是当数据量非常大时,region会进行split、compact等处理,并且这些处理非常占用计算资源和IO开销,影响性能和集群的稳定性。

HBase的数据最终是以HFile的形式存储到HDFS上的,如果我们能直接将数据生成为HFile文件,然后将HFile文件保存到HBase对应的表中,可以避免上述的很多问题,效率会相对更高。

本篇文章主要介绍如何使用Spark生成HFile文件,然后通过BulkLoad方式将数据导入到HBase中,并附批量put数据到HBase以及直接存入数据到HBase中的实际应用示例。

1. 生成HFile,BulkLoad导入

1.1 数据样例

代码语言:javascript
复制
{"id":"1","name":"jack","age":"18"}
{"id":"2","name":"mike","age":"19"}
{"id":"3","name":"kilos","age":"20"}
{"id":"4","name":"tom","age":"21"}
...

1.2 示例代码

代码语言:javascript
复制
/**
  * @Author bigdatalearnshare
  */
object App {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")

    val sparkSession = SparkSession
      .builder()
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[*]")
      .getOrCreate()
    
    val rowKeyField = "id"
    
    val df = sparkSession.read.format("json").load("/people.json")

    val fields = df.columns.filterNot(_ == "id").sorted

    val data = df.rdd.map { row =>
      val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)

      val kvs = fields.map { field =>
        new KeyValue(rowKey, Bytes.toBytes("hfile-fy"), Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
      }

      (new ImmutableBytesWritable(rowKey), kvs)
    }.flatMapValues(x => x).sortByKey()
    
    val hbaseConf = HBaseConfiguration.create(sparkSession.sessionState.newHadoopConf())
    hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "hfile")
    val connection = ConnectionFactory.createConnection(hbaseConf)

    val tableName = TableName.valueOf("hfile")

    //没有HBase表则创建
    creteHTable(tableName, connection)

    val table = connection.getTable(tableName)

    try {
      val regionLocator = connection.getRegionLocator(tableName)

      val job = Job.getInstance(hbaseConf)

      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
      job.setMapOutputValueClass(classOf[KeyValue])

      HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)

      val savePath = "hdfs://linux-1:9000/hfile_save"
      delHdfsPath(savePath, sparkSession)

      job.getConfiguration.set("mapred.output.dir", savePath)

      data.saveAsNewAPIHadoopDataset(job.getConfiguration)

      val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
      bulkLoader.doBulkLoad(new Path(savePath), connection.getAdmin, table, regionLocator)

    } finally {
      //WARN LoadIncrementalHFiles: Skipping non-directory hdfs://linux-1:9000/hfile_save/_SUCCESS 不影响,直接把文件移到HBASE对应HDFS地址了
      table.close()
      connection.close()
    }

    sparkSession.stop()
  }

  def creteHTable(tableName: TableName, connection: Connection): Unit = {
    val admin = connection.getAdmin

    if (!admin.tableExists(tableName)) {
      val tableDescriptor = new HTableDescriptor(tableName)
      tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("hfile-fy")))
      admin.createTable(tableDescriptor)
    }
  }

  def delHdfsPath(path: String, sparkSession: SparkSession) {
    val hdfs = FileSystem.get(sparkSession.sessionState.newHadoopConf())
    val hdfsPath = new Path(path)

    if (hdfs.exists(hdfsPath)) {
      //val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
      hdfs.delete(hdfsPath, true)
    }
  }
}

1.3 注意事项

上述示例代码可以根据实际业务需求作相应调整,但有一个问题需要特别注意:

通过Spark读取过来的数据生成HFile时,要确保HBase的主键、列族、列按照有序排列。否则,会抛出以下异常:

代码语言:javascript
复制
Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = 1/hfile-fy:age/1588230543677/Put/vlen=2/seqid=0, lastCell = 1/hfile-fy:name/1588230543677/Put/vlen=4/seqid=0

2. 批量put

2.1数据样例

代码语言:javascript
复制
val rowKeyField = "id"
val df = sparkSession.read.format("json").load("/stats.json")
val fields = df.columns.filterNot(_ == "id")

df.rdd.foreachPartition { partition =>
      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
      hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "batch_put")

      val conn = ConnectionFactory.createConnection(hbaseConf)
      val table = conn.getTable(TableName.valueOf("batch_put"))

      val res = partition.map { row =>
        val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)
        val put = new Put(rowKey)
        val family = Bytes.toBytes("hfile-fy")

        fields.foreach { field =>
          put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
        }

        put
      }.toList

      Try(table.put(res)).getOrElse(table.close())

      table.close()
      conn.close()
}

在实际应用中,我们也可以将经常一起查询的数据拼接在一起存入一个列中,比如将上述的pv和uv拼接在一起使用,可以降低KeyValue带来的结构化开销。

3.saveAsNewAPIHadoopDataset

代码语言:javascript
复制
val hbaseConf = sparkSession.sessionState.newHadoopConf()
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")

hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "direct")
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val rowKeyField = "id"

val df = sparkSession.read.format("json").load("/stats.json")

val fields = df.columns.filterNot(_ == "id")

df.rdd.map { row =>
    val put = new Put(Bytes.toBytes(row.getAs(rowKeyField).toString))

    val family = Bytes.toBytes("hfile-fy")

    fields.foreach { field =>
      put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
    }

    (new ImmutableBytesWritable(), put)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)


以上主要介绍了3种利用Spark将数据导入HBase的方式。其中,通过生成HFile文件,然后以BulkLoad导入的方式更适合于大数据量的操作。

此外,如果我们在使用Spark(或者其他计算引擎)读取HBase表数据时,如果效率相对低,比如:Spark读取HBase时会根据region的数量生成对应数量的task,导致相同数据量下,会比直接读取Hive数据慢,也可以通过直接读取HFile的方式来处理。当然,实际应用还要结合具体的场景,涉及的技术等。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据学习与分享 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档