使用Spark通过BulkLoad快速导入数据到HBase

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在项目中有需求需要将Hive表中的数据存储在HBase中。使用Spark访问Hive表,将读表数据导入到HBase中,写入HBase有两种方式:一种是通过HBase的API接口批量的将数据写入HBase,另一种是通过BulkLoad的方式生成HFile文件然后加载到HBase中,两种方式相比之下第二种效率会更高。本篇文章Fayson主要介绍如何使用Spark读取Hive表数据通过BulkLoad的方式快速的将数据导入到HBase。

  • 文章概述

1.环境准备

2.示例代码及运行

3.总结

  • 测试环境

1.CM5.14.3和CDH5.14.2

2.集群未启用Sentry和Kerberos

3.Spark1.6.0

2.环境准备


本篇文章主要使用HBase中hbase-spark包提供的HBaseContext来实现,需要准备hbase-spark的依赖包并部署到Spark集群。

1.将准备好的hbase-spark-1.2.0-cdh5.13.1.jar部署到集群所有节点的/opt/cloudera/parcels/CDH/lib/spark/lib目录下

[root@cdh01 ~]# ll /opt/cloudera/parcels/CDH/lib/spark/lib/

(可左右滑动)

2.登录CM配置在spark-env.sh中增加hbase-spark的依赖,增加如下配置内容

export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/parcels/CDH/lib/spark/lib/hbase-spark-1.2.0-cdh5.13.1.jar

(可左右滑动)

保存配置,并重新部署Spark客户端配置

3.Hive表示例数据查看

去除重复数据共600条数据

3.Spark示例代码


1.使用Maven创建Scala示例工程,Pom.xml文件内容如下:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-assembly_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-spark</artifactId>
    <version>1.2.0-cdh5.13.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>

(可左右滑动)

2.在工程中创建Hive2HBase.scala文件,内容如下:

package com.cloudera.hbase

import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.spark.{HBaseContext, KeyFamilyQualifier}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job

import scala.collection.mutable

/**
  * package: com.cloudera.hbase
  * describe: 使用BulkLoad的方式将Hive数据导入HBase
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/7/31
  * creat_time: 下午2:04
  * 公众号:Hadoop实操
  */
object Hive2HBase {

  def main(args: Array[String]) {

    //库名、表名、rowKey对应的字段名、批次时间、需要删除表的时间参数
    val rowKeyField = "id"
    val quorum = "cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com"
    val clientPort = "2181"
    val hBaseTempTable = "ods_user_hbase"

    val sparkConf = new SparkConf().setAppName("Hive2HBase")
    val sc = new SparkContext(sparkConf)

    val hiveContext = new HiveContext(sc)
    //从hive表读取数据
    val datahiveDF = hiveContext.sql(s"select * from ods_user")

    //表结构字段
    var fields = datahiveDF.columns

    //去掉rowKey字段
    fields = fields.dropWhile(_ == rowKeyField)

    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set("hbase.zookeeper.quorum", quorum)
    hBaseConf.set("hbase.zookeeper.property.clientPort", clientPort)

    //表不存在则建Hbase临时表
    creteHTable(hBaseTempTable, hBaseConf)

    val hbaseContext = new HBaseContext(sc, hBaseConf)

    //将DataFrame转换bulkload需要的RDD格式
    val rddnew = datahiveDF.rdd.map(row => {
      val rowKey = row.getAs[String](rowKeyField)

      fields.map(field => {
        val fieldValue = row.getAs[String](field)
        (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
      })
    }).flatMap(array => {
      (array)
    })

    //使用HBaseContext的bulkload生成HFile文件
    hbaseContext.bulkLoad[Put](rddnew.map(record => {
      val put = new Put(record._1)
      record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")

    val conn = ConnectionFactory.createConnection(hBaseConf)
    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
    val realTable = conn.getTable(hbTableName)
    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)

    // bulk load start
    val loader = new LoadIncrementalHFiles(hBaseConf)
    val admin = conn.getAdmin()
    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)

    sc.stop()
  }

  /**
    * 创建HBase表
    * @param tableName 表名
    */
  def creteHTable(tableName: String, hBaseConf : Configuration) = {
    val connection = ConnectionFactory.createConnection(hBaseConf)
    val hBaseTableName = TableName.valueOf(tableName)
    val admin = connection.getAdmin
    if (!admin.tableExists(hBaseTableName)) {
      val tableDesc = new HTableDescriptor(hBaseTableName)
      tableDesc.addFamily(new HColumnDescriptor("info".getBytes))
      admin.createTable(tableDesc)
    }
    connection.close()
  }

  /**
    * Prepare the Put object for bulkload function.
    * @param put The put object.
    * @throws java.io.IOException
    * @throws java.lang.InterruptedException
    * @return Tuple of (KeyFamilyQualifier, bytes of cell value)*/
  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }
}

(可左右滑动)

3.使用Maven命令将工程编译为jar包

4.示例运行


1.将编译好的spark-demo-1.0-SNAPSHOT.jar包上传至服务器,使用spark-submit提交

export HADOOP_USER_NAME=hbase
spark-submit --class com.cloudera.hbase.Hive2HBase \
--master yarn-client \
--driver-cores 1 \
--driver-memory 2g \
--executor-cores 1 \
--executor-memory 2g \
spark-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过Spark作业界面,查看作业运行情况

2.作业执行成功后,查看HBase表数据

查看表数据

5.总结


1.本篇文章是使用hbase-spark包中提供的bulkload方法生成HFile文件,然后将生成的文件导入到HBase表中。

2.使用bulkload的方式导入数据到HBase表时,在load HFile文件到表过程中会有短暂的时间导致该表停止服务(在load文件过程中需要先disable表,load完成后在enable表。

3.需要使用hbase用户提交Spark作业

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/sparkdemo/src/main/scala/com/cloudera/hbase/Hive2HBase.scala

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-08-01

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

SparkSQL项目中的应用

Spark是一个通用的大规模数据快速处理引擎。可以简单理解为Spark就是一个大数据分布式处理框架。基于内存计算的Spark的计算速度要比Hadoop的MapR...

1283
来自专栏积累沉淀

30分钟--Spark快速入门指南

Spark快速入门指南 – Spark安装与基础使用  2016-01-15 (updated: 2016-03-07) 6309 29 Apache Spa...

5279
来自专栏Jed的技术阶梯

zookeeper编程02-服务器上下线动态感知

NameNode判断DataNode是否下线的时间太长了,利用zookeeper实现服务器上下线动态感知

2012
来自专栏Hadoop实操

如何使用Spark Streaming读取HBase的数据并写入到HDFS

Spark Streaming是在2013年被添加到Apache Spark中的,作为核心Spark API的扩展它允许用户实时地处理来自于Kafka、Flum...

1.1K4
来自专栏Spark学习技巧

Spark调优系列之序列化方式调优

由于大多数的spark计算是基于内存的的天性,spark应用的瓶颈一般受制于集群的CPU,网络带宽,内存。大部分情况下,如果内存适合当前数据量的计算,那么瓶颈往...

3179
来自专栏Spark学习技巧

必会:关于SparkStreaming checkpoint那些事儿

spark Streaming的checkpoint是一个利器,帮助在driver端非代码逻辑错误导致的driver应用失败重启,比如网络,jvm等,当然也仅限...

1332
来自专栏知识分享

关于STM32空闲中断

有一次做一个东西,为了尽量不占用CPU的处理数据时间,所以就使用DMA接收串口的数据,但是呢问题来了.,,,,,怎么样才能确定接收到了一条完整的数据了,,我们都...

3488
来自专栏Hadoop实操

如何使用Oozie API接口向Kerberos环境的CDH集群提交Spark2作业

前面Fayson介绍了多种方式在CDH集群外的节点向集群提交Spark作业,文章中均采用Spark1来做为示例,本篇文章主要介绍如何是用Oozie API向Ke...

1.2K4
来自专栏美图数据技术团队

快速、安全、可靠!Yarn!| MTdata小讲堂

Yarn 的全称是 Yet Anther Resource Negotiator(另一种资源协商者)。它作为 Hadoop 的一个组件,官方对它的定义是一个工作...

1282
来自专栏华章科技

Spark知识体系完整解读

Spark简介 Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce的算子map 函数和reduce函数及计算模型,还...

1372

扫码关注云+社区

领取腾讯云代金券