专栏首页Hadoop实操0538-5.15.0-Spark2 KuduContext访问Kudu

0538-5.15.0-Spark2 KuduContext访问Kudu

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1

文档编写目的

在集群中访问Kudu的方式有多种,可以通过Impala使用JDBC的方式,也可以通过Kudu提供的Client API方式,参考Fayson前面的文章《如何使用Java API访问CDH的<em>Kudu</em>》和《如何使用Java代码访问Kerberos环境下的Kudu》。在做Spark开发时也有访问Kudu的需求,Kudu API访问是一种方式,这里Fayson使用KuduContext实现对Kudu的读写操作。

  • 内容概述

1.环境准备

2.Spark Kudu示例代码

3.示例运行及验证

4.总结

  • 测试环境

1.CM和CDH版本为5.15.0

2.Spark2.2.0.cloudera2

2

环境准备

在CDH5.15.0环境下安装了Spark2后默认是添加kudu-spark2的依赖包,我们可以在Kudu的安装目录下找到相应版本的kudu-spark2_2.11-{cdh.version}.jar。这里在Spark2的环境变量中将kudu-spark2的依赖包,确保Spark2作业能够正常的调用kudu-spark2提供的API。

1.在集群的任意节点执行如下命令找到Kudu-spark2对应版本的依赖包

[root@cdh4 ~]# find / -name kudu-spark2*.jar

2.登录CM进入Spark2的配置界面搜索“spark-env.sh”,增加如下配置:

#配置Spark2的Java环境,Spark2要求JDK8或以上版本
export JAVA_HOME=/usr/java/jdk1.8.0_131
#加载该依赖包的主要目的是Spark2的Logging为私有的,Fayson自己重写了Logging类
export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/external-jars/spark2-demo-1.0-SNAPSHOT.jar
#加载kudu-spark2_2.11.jar到Spark的环境变量中
export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/parcels/CDH/lib/kudu/kudu-spark2_2.11.jar

3.完成上述配置后,部署Spark2客户端配置

完成部署即可在Spark2 Gateway节点上提交Spark2访问Kudu的应用。

3

Spark Kudu示例代码

1.在Spark2工程中添加SparkOnHBase的Maven依赖

<!-- 添加Spark2访问Kudu的依赖包 -->
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark2_2.11</artifactId>
    <version>1.7.0-cdh5.15.0</version>
</dependency>

2.在工程中创建KuduSample.scala类,内容如下:

package com.cloudera.kudu

import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.JavaConverters._

/**
  * package: com.cloudera.kudu
  * describe: Spark2 使用KuduContext访问Kudu
  * 该示例业务逻辑,Spark读取Hive的ods_user表前10条数据,写入Kudu表(通过ods_user表的Schema创建kudu表)
  * 读取kudu_user_info表数据,将返回的rdd转换为DataFrame写入到Hive的kudu2hive表中
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2019/1/25
  * creat_time: 上午10:58
  * 公众号:Hadoop实操
  */
object KuduSample {

  /**
    * 定义一个UserInfo对象
    */
  case class UserInfo (
    id: String,
    name: String,
    sex: String,
    city: String,
    occupation: String,
    tel: String,
    fixPhoneNum: String,
    bankName: String,
    address: String
  )

  def main(args: Array[String]): Unit = {

    val kuduMaster = "cdh1.fayson.com,cdh2.fayson.com,cdh3.fayson.com"
    val kuduTableName = "kudu_user_info"
    val hiveTableName = "kudu2hive"

    //Spark Conf配置信息
    val conf = new SparkConf()
      .setAppName("Spark2OnKuduSample")
      .set("spark.master", "yarn")
      .set("spark.submit.deployMode", "client")

    //初始化SparkSession对象
    val spark = SparkSession.builder().config(conf).getOrCreate()
    //引入隐式
    import spark.implicits._
    val kuduContext = new KuduContext(kuduMaster, spark.sparkContext)

    //查询出Hive表数据
    val odsuserdf = spark.sql("select * from ods_user limit 10")

    //判断表是否存在
    if(!kuduContext.tableExists(kuduTableName)) {
      val createTableOptions = new CreateTableOptions()
      createTableOptions.addHashPartitions(List("id").asJava, 8).setNumReplicas(3)
      kuduContext.createTable(kuduTableName, odsuserdf.schema.add("id", StringType, false), Seq("id"), createTableOptions)
    }
    //将Hive中ods_user表的前10条数据写入到kudutableName表中
    kuduContext.upsertRows(odsuserdf, kuduTableName)

    //读取出kuduTableName表的数据
    val kudurdd = kuduContext.kuduRDD(spark.sparkContext, kuduTableName, Seq("id","name","sex","city","occupation","tel","fixPhoneNum","bankName","address"))

    //将kudurdd转换转换为DataFrame对象,写到hive的表中
    spark.sqlContext.createDataFrame(kudurdd.mapPartitions(partition => {
      partition.map(row =>{new UserInfo(
        row.getAs[String](0),
        row.getAs[String](1),
        row.getAs[String](2),
        row.getAs[String](3),
        row.getAs[String](4),
        row.getAs[String](5),
        row.getAs[String](6),
        row.getAs[String](7),
        row.getAs[String](8)
      )})
    })).write.saveAsTable(hiveTableName)

    spark.close()
  }

}

3.使用Maven命令编译工程

mvn clean scala:compile package

4.将编译好的spark2-demo-1.0-SNAPSHOT.jar上传到集群有Spark2 Gateway的节点上,使用Spark2-submit命令提交

kinit hiveadmin
spark2-submit --class com.cloudera.kudu.KuduSample \
    --master yarn --num-executors 4 --driver-memory 1g\
    --driver-cores 1 --executor-memory 1g --executor-cores 1\
    /data/disk1/hbase-spark-demo/spark2-demo-1.0-SNAPSHOT.jar

作业执行成功

5.访问Kudu Master的UI界面“Tables”可以看到通过Spark2作业创建的kudu_user_info表

进入kudu_user_info表找到在Impala上创建Kudu外部表的建表语句

CREATE EXTERNAL TABLE `kudu_user_info` STORED AS KUDU
TBLPROPERTIES(
    'kudu.table_name' = 'kudu_user_info',
    'kudu.master_addresses' = 'cdh1.fayson.com:7051,cdh2.fayson.com:7051,cdh3.fayson.com:7051')

6.登录Hue使用Impala执行引擎创建Kudu的外部表,

查看数据

7.在代码的业务中,Fayson又将数据Kudu表的数据写会到Hive的kudu2hive表中

4

总结

1.访问Kudu可以通过Kudu API接口实现参考Fayson文章开头部分提到的Java示例文章,但在使用Spark访问Kudu时建议使用kudu-spark,使用该方式访问对于安全集群访问不需要考虑Driver和每个Executor访问Kudu的Kerberos配置。

2.在Fayson的示例代码中,是通过查询hive表的Schema生成Kudu的Schema,由于Kudu的主键不能为空,所以在代码中增加了如下代码更新id主键不为空。

odsuserdf.schema.add("id", StringType, false)

3.在使用kudu-spark2的依赖包时,可以在当前集群安装的Kudu目录下找到该依赖包。

4.kuduContext在获取kudu表时必须指定列名,否则获取到的是一个空的ROW。

5.kuduContext.kuduRDD返回的RDD[Row]对象,该对象中Row中没有每个列的属性,所以在封装UserInfo对象时是通过index来获取每个列的值。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/kudu/KuduSample.scala

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

本文分享自微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-02-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Apache Kudu 1.8.0 发布

    10月26日,Kudu在其社区官宣了Kudu 1.8.0的正式发布。以下是主要的一些更新内容:

    Fayson
  • 0595-CDH6.2的新功能

    前置文章参考《0585-Cloudera Enterprise 6.2.0发布》和《0589-Cloudera Manager6.2的新功能》

    Fayson
  • 如何使用Flume采集Kafka数据写入Kudu

    Fayson
  • redis 学习笔记(2)-client端示例代码

    redis提供了几乎所有主流语言的client,java中主要使用二种:Jedis与Redisson

    菩提树下的杨过
  • 设计模式之状态模式(一)

    我原本以为在对乡村的一切事物都很容易,但是每次我一回头就有更多变更的请求纷至沓来。我快崩溃了!----当小编读到这段话的时候,甚是感慨,设计模式的世界真的不是那...

    程序员小跃
  • 浅谈面试中的OO设计问题

    OO设计问题是电面或者onsite中常考的问题,尤其对以Java为主要开发语言的公司(比如Amazon),OO设计问题可以算必考。OO设计题目的特点是题目非常...

    包子面试培训
  • 科研动态 | 强冰雹和短时强降水天气雷达特征及临近预警

    湖北省人工影响天气中心李德俊高级工程师近年来专注研究云降水物理、人工影响天气等关键技术问题。针对山区冰雹、短时强降水等强对流天气频发,为了有效防灾减灾,采用多种...

    气象学家
  • 科研动态 | 强冰雹和短时强降水天气雷达特征及临近预警

    湖北省人工影响天气中心李德俊高级工程师近年来专注研究云降水物理、人工影响天气等关键技术问题。针对山区冰雹、短时强降水等强对流天气频发,为了有效防灾减灾,采用多种...

    zhangqibot
  • Apache Commons 工具类介绍及简单使用

    提供了一些公共的编解码实现,比如Base64, Hex, MD5,Phonetic and URLs等等。

    黄泽杰
  • String底层实现——动态字符串SDS

    上篇我们已经了解了Redis是什么,在Linux上如何安装,常见的数据类型和API使用,如果有不明白的,可以移步到主页。

    陈琛

扫码关注云+社区

领取腾讯云代金券