前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkStreaming读Kafka数据写HBase

SparkStreaming读Kafka数据写HBase

作者头像
Fayson
发布2018-07-11 16:40:05
6.4K1
发布2018-07-11 16:40:05
举报
文章被收录于专栏:Hadoop实操

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面的文章Fayson介绍过《如何使用Spark Streaming读取HBase的数据并写入到HDFS》,关于SparkStreaming的应用场景很多,本篇文章Fayson主要介绍使用Scala语言开发一个SparkStreaming应用读取Kafka数据并写入HBase。本文的数据流图如下:

  • 内容概述

1.环境准备

2.编写SparkSteaming代码读取Kafka数据并写入HBase

3.流程测试

4.总结

  • 测试环境

1.CM和CDH版本为5.12.1

2.采用root用户操作

  • 前置条件

1.集群已安装Kafka

2.环境准备


1.编写向Kafka生成数据的ReadUserInfoFIleToKafka.java代码,具体内容可以在Fayson的GitHub上查看

代码语言:javascript
复制
https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadUserInfoFIleToKafka.java
https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell

(可左右滑动)

2.使用mvn命令将编写好的代码编译打包封装成脚本

代码语言:javascript
复制
mvn clean package

(可左右滑动)

使用mvn命令将工程依赖包导出到lib目录

代码语言:javascript
复制
mvn dependency:copy-dependencies -DoutputDirectory=/Users/fayson/Desktop/lib

(可左右滑动)

编写run.sh脚本

代码语言:javascript
复制
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_hbase_topic
#
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadUserInfoFIleToKafka $read_file

(可左右滑动)

准备测试数据ods_user_600.txt

封装好的脚本目录结构如下:

将打包好的jar包拷贝至lib目录下。

3.创建用于测试的Kafka Topic

代码语言:javascript
复制
kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_hbase_topic

(可左右滑动)

4.创建HBase表,用于测试

代码语言:javascript
复制
create 'user_info','info'

(可左右滑动)

5.通过CM配置SparkStreaming应用依赖包spark-streaming-kafka_2.10-1.6.0-cdh5.12.1.jar

将依赖包部署至CDH集群所有节点的/opt/cloudera/parcels/CDH/jars目录,然后通过CM配置Spark GateWay的spark-env.sh配置

代码语言:javascript
复制
export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/parcels/CDH/jars/spark-streaming-kafka_2.10-1.6.0-cdh5.12.1.jar

(可左右滑动)

保存并重新部署客户端配置。

3.编写SparkStreaming应用


1.使用Maven创建Scala工程,工程依赖pom文件

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.13.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-assembly_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>

(可左右滑动)

2.编写获取HBase连接的HBaseUtil工具类,内容如下:

代码语言:javascript
复制
package utils
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
/**
  * package: utils
  * describe: HBase工具类
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/5/28
  * creat_time: 上午10:51
  * 公众号:Hadoop实操
  */
object HBaseUtil extends Serializable {
  /**
    * @param zkList Zookeeper列表已逗号隔开
    * @param port ZK端口号
    * @return
    */
  def getHBaseConn(zkList: String, port: String): Connection = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", zkList)
    conf.set("hbase.zookeeper.property.clientPort", port)
    val connection = ConnectionFactory.createConnection(conf)
    connection
  }
}

(可左右滑动)

3.编写Kafka2Spark2HBase.scala类,内容如下:

代码语言:javascript
复制
package com.cloudera.streaming
import java.io.{File, FileInputStream, InputStreamReader}
import java.util.Properties
import kafka.serializer.StringDecoder
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import utils.HBaseUtil
import scala.util.Try
import scala.util.parsing.json.JSON
/**
  * package: com.cloudera.streaming
  * describe: SparkStreaming 应用实时读取Kafka数据,解析后存入HBase
  * 使用spark-submit的方式提交作业
    spark-submit --class com.cloudera.streaming.Kafka2Spark2HBase \
    --master yarn-client --num-executors 1 --driver-memory 1g \
    --driver-cores 1 --executor-memory 1g --executor-cores 1 \
    spark-demo-1.0-SNAPSHOT.jar cdh04.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092 kafka_sparkstreaming_hbase_topic
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/5/28
  * creat_time: 上午10:09
  * 公众号:Hadoop实操
  */
object Kafka2Spark2HBase {
  var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0283.properties"
  def main(args: Array[String]): Unit = {
    //加载配置文件
    val properties = new Properties()
    val file = new File(confPath)
    if(!file.exists()) {
      System.out.println(Kafka2Spark2HBase.getClass.getClassLoader.getResource("0283.properties"))
      val in = Kafka2Spark2HBase.getClass.getClassLoader.getResourceAsStream("0283.properties")
      properties.load(in);
    } else {
      properties.load(new FileInputStream(confPath))
    }
    val brokers = properties.getProperty("kafka.brokers")
    val topicsSet = properties.getProperty("kafka.topics").split(",").toSet
    val zkHost = properties.getProperty("zookeeper.list")
    val zkport = properties.getProperty("zookeeper.port")
    val sparkConf = new SparkConf().setAppName("Kafka2Spark2HBase")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5)) //设置Spark时间窗口,每5s处理一次
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    dStream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val connection = HBaseUtil.getHBaseConn(zkHost, zkport) // 获取Hbase连接
        partitionRecords.foreach(line => {
          //将Kafka的每一条消息解析为JSON格式数据
          println(line._2)
          val jsonObj =  JSON.parseFull(line._2)
          val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
          val rowkey = map.get("id").get.asInstanceOf[String]
          val name = map.get("name").get.asInstanceOf[String]
          val sex = map.get("sex").get.asInstanceOf[String]
          val city = map.get("city").get.asInstanceOf[String]
          val occupation = map.get("occupation").get.asInstanceOf[String]
          val mobile_phone_num = map.get("mobile_phone_num").get.asInstanceOf[String]
          val fix_phone_num = map.get("fix_phone_num").get.asInstanceOf[String]
          val bank_name = map.get("bank_name").get.asInstanceOf[String]
          val address = map.get("address").get.asInstanceOf[String]
          val marriage = map.get("marriage").get.asInstanceOf[String]
          val child_num = map.get("child_num").get.asInstanceOf[String]
          val tableName = TableName.valueOf("user_info")
          val table = connection.getTable(tableName)
          val put = new Put(Bytes.toBytes(rowkey))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("city"), Bytes.toBytes(city))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("occupation"), Bytes.toBytes(occupation))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile_phone_num"), Bytes.toBytes(mobile_phone_num))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("fix_phone_num"), Bytes.toBytes(fix_phone_num))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("bank_name"), Bytes.toBytes(bank_name))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("marriage"), Bytes.toBytes(marriage))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("child_num"), Bytes.toBytes(child_num))
          Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
          table.close()//分区数据写入HBase后关闭连接
        })
    connection.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

(可左右滑动)

4.使用mvn命令将编写好的SparkStreaming代码打包,注意由于工程中有scala代码在编译是命令中需要加scala:compile

代码语言:javascript
复制
mvn clean scala:compile package

(可左右滑动)

4.流程测试


1.将编译好的SparkStreaming应用Jar包上传至有Spark Gateway节点的服务器上

conf/0283.properties内容如下:

2.使用spark-submit命令提交SparkStreaming作业

代码语言:javascript
复制
spark-submit --class com.cloudera.streaming.Kafka2Spark2HBase \
--master yarn-client --num-executors 2 --driver-memory 1g \
--driver-cores 1 --executor-memory 1g --executor-cores 1 \
spark-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过CM查看SparkStreaming作业是否正常运行

Yarn的8088界面查看

3.查看HBase中user_info表数据

4.运行脚本向Kafka生产数据

代码语言:javascript
复制
[root@cdh01 0283-kafka-shell]# cd /root/0283-kafka-shell
[root@cdh01 0283-kafka-shell]# sh run.sh ods_user_600.txt

(可左右滑动)

5.通过Hue查看HBase的user_info表数据

Kafka的数据已成功的录入到HBase的user_info表中

HBase 命令行查看数据

5.总结


1.由于Spark中默认没有Spark-Streaming-Kafka的依赖包,需要将相应的依赖包添加到/opt/cloudera/parcels/CDH/jars目录下,然后在spark-env.sh中配置相应的依赖包路径,否则会报类找不到的异常。

2.在获取HBase的Connection后,完成数据入库后记得close掉,否则在应用运行一段时间后就无法获取的Zookeeper的连接,导致数据无法入库。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/sparkdemo/src/main/scala/com/cloudera/streaming/Kafka2Spark2HBase.scala

https://github.com/fayson/cdhproject/blob/master/sparkdemo/src/main/scala/com/cloudera/utils/HBaseUtil.scala

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档