前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >0540-5.15.0-Spark2使用HBase-Spark访问HBase

0540-5.15.0-Spark2使用HBase-Spark访问HBase

作者头像
Fayson
发布2019-03-07 10:57:57
3.2K1
发布2019-03-07 10:57:57
举报
文章被收录于专栏:Hadoop实操Hadoop实操

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1

文章编写目的

越来越多的用户使用Spark对接HBase,对接HBase的方式有多种,通过HBase-client API实现,也有直接Spark On HBase的方式实现,比较常见的有华为的Spark-SQL-on-HBase,Hortonworks的Apache HBase Connector和Cloudera提供的SparkOnHBase,目前Cloudera的SparkOnHBase已提交的HBase的主干版本。本篇文章Fayson主要在Spark2环境下使用Cloudera的SparkOnHBase访问HBase。

  • 内容概述

1.环境准备

2.SparkOnHBase示例代码

3.示例运行及验证

4.总结

  • 测试环境

1.CM和CDH版本为5.15.0

2.Spark2.2.0.cloudera2

2

环境准备

在CDH5.15.0环境下安装了Spark2后默认是没有与HBase集成的,所以这里我们需要配置Spark2与HBase集成,在Spark环境变量中增加HBase的配置信息。

1.登录CM进入Spark2的配置界面搜索“spark-env.sh”,增加如下配置:

代码语言:javascript
复制
#配置Spark2的Java环境,Spark2要求JDK8或以上版本
export JAVA_HOME=/usr/java/jdk1.8.0_131
#加载该依赖包的主要目的是Spark2的Logging为私有的,Fayson自己重写了Logging类
export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/external-jars/spark2-demo-1.0-SNAPSHOT.jar
#加载HBase的依赖包到Spark2环境变量中
for loop in `ls /opt/cloudera/parcels/CDH/jars/hbase-*.jar`;do
   export SPARK_DIST_CLASSPATH=${loop}:${SPARK_DIST_CLASSPATH}
done
#加载HBase的配置到Spark2的环境变量中
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}:/etc/hbase/conf/

2.完成上述配置后,部署Spark2客户端配置

完成部署

3.在HBase中创建一个用于测试的表user_info

代码语言:javascript
复制
create 'user_info','info'

3

SparkOnHBase示例代码

1.在Spark2工程中添加SparkOnHBase的Maven依赖

代码语言:javascript
复制
<!-- 添加Spark2访问Kudu的依赖包 -->
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark2_2.11</artifactId>
    <version>1.7.0-cdh5.15.0</version>
</dependency>

2.在工程中创建ClouderaSparkOnHBase.scala类,内容如下:

代码语言:javascript
复制
package com.cloudera.hbase

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * package: com.cloudera.hbase
  * describe: 使用Cloudera提供的Spark On HBase访问HBase
  * creat_user: Fayson
  * email: htechinfo@163.com
  * creat_date: 2019/1/24
  * creat_time: 上午10:59
  * 公众号:Hadoop实操
  */
object ClouderaSparkOnHBase {

  def main(args: Array[String]): Unit = {
    //Spark Conf配置信息
    val conf = new SparkConf()
      .setAppName("ClouderaSparkOnHBase")
      .set("spark.master", "yarn")
      .set("spark.submit.deployMode", "client")

    //初始化SparkSession对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    //初始化HBase Configuration
    val hbaseconf = HBaseConfiguration.create()
    //创建HBaseContext对象
    val hbaseContext = new HBaseContext(spark.sparkContext, hbaseconf)
    //准备一个RDD,后面用于向HBase表插入数据
    val rdd = spark.sparkContext.parallelize(Array(
      (Bytes.toBytes("1"), Array((Bytes.toBytes("info"), Bytes.toBytes("a"), Bytes.toBytes("1")))),
      (Bytes.toBytes("2"), Array((Bytes.toBytes("info"), Bytes.toBytes("b"), Bytes.toBytes("2")))),
      (Bytes.toBytes("3"), Array((Bytes.toBytes("info"), Bytes.toBytes("c"), Bytes.toBytes("3")))),
      (Bytes.toBytes("4"), Array((Bytes.toBytes("info"), Bytes.toBytes("d"), Bytes.toBytes("4")))),
      (Bytes.toBytes("5"), Array((Bytes.toBytes("info"), Bytes.toBytes("e"), Bytes.toBytes("5"))))
    ))

    val tableName = TableName.valueOf("user_info")
    //使用HBaseContext.bulkPut向指定的HBase表写数据
    hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
      tableName,
      (putRecord) => {
        val put = new Put(putRecord._1)
        putRecord._2.foreach((putValue) =>
          put.addColumn(putValue._1, putValue._2, putValue._3)
        )
        put
      });
  }

}

3.使用Maven命令编译工程

代码语言:javascript
复制
mvn clean scala:compile package

4.将编译好的spark2-demo-1.0-SNAPSHOT.jar上传到集群有Spark2 Gateway的节点上,使用Spark2-submit命令提交

代码语言:javascript
复制
kinit fayson
spark2-submit --class com.cloudera.hbase.ClouderaSparkOnHBase \

    --master yarn --num-executors 4 --driver-memory 1g \

    --driver-cores 1 --executor-memory 1g --executor-cores 1 \

    /data/disk1/hbase-spark-demo/spark2-demo-1.0-SNAPSHOT.jar

作业执行成功

5.登录HBase查看user_info表数据

4

总结

1.Spark2使用SparkOnHBase开发访问HBase时,代码编译时会报“Could not access type Logging in package org.apache.spark”具体可以参考Fayson前面的文章《HBase-Spark无法在Spark2编译通过问题解决》

2.在进行Spark2与HBase环境集成时,将spark2-demo-1.0-SNAPSHOT.jar包加载至环境变量(确保集群所有节点/opt/cloudera/external目录下均有这个Jar包),是为了HBaseContext能够正常加载org.apche.spark.Logging类,当然可以将该类打包到一个独立的包中,Fayson这里偷懒直接使用示例工程的jar包。

3.使用SparkOnHBase可以方便的访问HBase,在非Kerberos和Kerberos环境下不需要考虑认证问题(Fayson在前面Spark2Streaming系列时使用的hbase-client API访问HBase,Kerberos环境下还需要考Driver和Executor的jaas.conf配置)

4.在代码中创建HBaseConfiguration.create()对象后设置ZK地址在每个Executor上无法正常获取ZK连接,默认加载的还是localhost配置(因为未在Spark2环境变量中指定HBase配置文件地址导致),因此使用SparkOnHBase必须完成Spark2与HBase的集成。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/hbase/ClouderaSparkOnHBase.scala

https://github.com/fayson/cdhproject/blob/master/spark2demo/pom.xml

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档