Spark2Streaming读Kafka并写入到HBase

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的

在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《Spark2Streaming读Kerberos环境的Kafka并写数据到HBase》、《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》及《Spark2Streaming读Kerberos环境的Kafka并写数据到Hive》。本篇文章Fayson主要介绍如何使用Spark2Streaming访问非Kerberos环境的Kafka并将接收到的数据写入HBase。

  • 内容概述:

1.环境准备

2.Spark2Streaming示例开发

3.示例运行

4.总结

  • 测试环境:

1.CM5.14.3/CDH5.14.2

2.CDK2.2.0(Apache Kafka0.10.2)

3.SPARK2.2.0

4.操作系统版本为Redhat7.3

5.采用root用户进行操作

2.环境准备

1.准备向Kakfa发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell

根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:

{
   "occupation": "生产工作、运输工作和部分体力劳动者",
   "address": "台东东二路16号-8-8",
   "city": "长治",
   "marriage": "1",
   "sex": "1",
   "name": "仲淑兰",
   "mobile_phone_num": "13607268580",
   "bank_name": "广州银行31",
   "id": "510105197906185179",
   "child_num": "1",
   "fix_phone_num": "15004170180"
}

(可左右滑动)

2.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10

3.通过CM下载HBase客户端配置文件

4.在/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下,添加Spark2访问HBase的依赖包,依赖的jar包如下:

hbase-client-1.2.0-cdh5.14.2.jar
hbase-common-1.2.0-cdh5.14.2.jar
hbase-protocol-1.2.0-cdh5.14.2.jar
htrace-core-3.2.0-incubating.jar

(可左右滑动)

注意:需要将依赖包拷贝至集群所有节点。

5.创建HBase测试表user_info

hbase(main):002:0> create 'user_info','info'
0 row(s) in 1.4680 seconds

=> Hbase::Table - user_info
hbase(main):003:0> 

(可左右滑动)

3.SparkStreaming示例开发

1.使用maven创建scala语言的spark2demo工程,pom.xml依赖如下

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.14.2</version>
</dependency>

(可左右滑动)

具体需要的依赖包,可以参考Fayson前面的文章。

2.添加访问HBase的集群配置信息hdfs-site.xml/core-stie.xml/hbase-site.xml文件

3.在resources下创建0293.properties配置文件,内容如下:

kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
kafka.topics=kafka_hbase_topic
group.id=testgroup

(可左右滑动)

4.创建HBaseUtils.scala类,主要用于创建HBase的Connection

package com.cloudera.utils

import java.io.File
import java.security.PrivilegedAction
import org.apache.hadoop.hbase.{HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.security.UserGroupInformation

/**
  * package: com.cloudera.utils
  * describe: 获取HBase Conncetion工具类
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/6/25
  * creat_time: 下午10:46
  * 公众号:Hadoop实操
  */
object HBaseUtil {

  /**
    * 获取Kerberos环境下的HBase连接
    * @param confPath
    * @param principal
    * @param keytabPath
    * @return
    */
  def getHBaseConn(confPath: String, principal: String, keytabPath: String): Connection = {
    val configuration = HBaseConfiguration.create
    val coreFile = new File(confPath + File.separator + "core-site.xml")
    if(!coreFile.exists()) {
      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/core-site.xml")
      configuration.addResource(in)
    }
    val hdfsFile = new File(confPath + File.separator + "hdfs-site.xml")
    if(!hdfsFile.exists()) {
      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hdfs-site.xml")
      configuration.addResource(in)
    }
    val hbaseFile = new File(confPath + File.separator + "hbase-site.xml")
    if(!hbaseFile.exists()) {
      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hbase-site.xml")
      configuration.addResource(in)
    }

    UserGroupInformation.setConfiguration(configuration)
    UserGroupInformation.loginUserFromKeytab(principal, keytabPath)
    val loginUser = UserGroupInformation.getLoginUser
    loginUser.doAs(new PrivilegedAction[Connection] {
      override def run(): Connection = ConnectionFactory.createConnection(configuration)
    })
  }


  /**
    * 获取非Kerberos环境的Connection
    * @param confPath
    * @return
    */
  def getNoKBHBaseCon(confPath: String): Connection = {

    val configuration = HBaseConfiguration.create
    val coreFile = new File(confPath + File.separator + "core-site.xml")
    if(!coreFile.exists()) {
      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/core-site.xml")
      configuration.addResource(in)
    }
    val hdfsFile = new File(confPath + File.separator + "hdfs-site.xml")
    if(!hdfsFile.exists()) {
      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hdfs-site.xml")
      configuration.addResource(in)
    }
    val hbaseFile = new File(confPath + File.separator + "hbase-site.xml")
    if(!hbaseFile.exists()) {
      val in = HBaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hbase-site.xml")
      configuration.addResource(in)
    }

    val connection = ConnectionFactory.createConnection(configuration)
    println("_---------------------" + connection.getAdmin.listTableNames().size)
    connection
  }

}

(可左右滑动)

5.创建Kafka2Spark2HBase.scala文件,内容如下:

package com.cloudera.streaming.nokerberos

import java.io.{File, FileInputStream}
import java.util.Properties

import com.cloudera.utils.HBaseUtil
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Try
import scala.util.parsing.json.JSON

/**
  * package: com.cloudera.streaming
  * describe: 非Kerberos环境中Spark2Streaming应用实时读取Kafka数据,解析后存入HBase
  * 使用spark2-submit的方式提交作业
  * spark2-submit --class com.cloudera.streaming.nokerberos.Kafka2Spark2Hbase \
    --master yarn \
    --deploy-mode client \
    --executor-memory 2g \
    --executor-cores 2 \
    --driver-memory 2g \
    --num-executors 2 \
    spark2-demo-1.0-SNAPSHOT.jar
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/07/23
  * creat_time: 下午10:40
  * 公众号:Hadoop实操
  */
object Kafka2Spark2Hbase {

  Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别

  var confPath: String = System.getProperty("user.dir") + File.separator + "conf"

  def main(args: Array[String]): Unit = {
    //加载配置文件
    val properties = new Properties()
    val file = new File(confPath + File.separator + "0293.properties")
    if(!file.exists()) {
      val in = Kafka2Spark2Hbase.getClass.getClassLoader.getResourceAsStream("0293.properties")
      properties.load(in);
    } else {
      properties.load(new FileInputStream(file))
    }

    val brokers = properties.getProperty("kafka.brokers")
    val topics = properties.getProperty("kafka.topics")
    val testgroup = properties.getProperty("group.id")
    println("kafka.brokers:" + brokers)
    println("kafka.topics:" + topics)

    if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(testgroup)) {
      println("未配置Kafka信息")
      System.exit(0)
    }
    val topicsSet = topics.split(",").toSet

    val spark = SparkSession.builder()
      .appName("Kafka2Spark2HBase-nokerberos")
      .config(new SparkConf())
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
      , "auto.offset.reset" -> "latest"
      , "key.deserializer" -> classOf[StringDeserializer]
      , "value.deserializer" -> classOf[StringDeserializer]
      , "group.id" -> testgroup
    )

    val dStream = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    dStream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val connection = HBaseUtil.getNoKBHBaseCon(confPath) // 获取Hbase连接
        partitionRecords.foreach(line => {
          //将Kafka的每一条消息解析为JSON格式数据
          val jsonObj =  JSON.parseFull(line.value())
          println(line.value())
          val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
          val rowkey = map.get("id").get.asInstanceOf[String]
          val name = map.get("name").get.asInstanceOf[String]
          val sex = map.get("sex").get.asInstanceOf[String]
          val city = map.get("city").get.asInstanceOf[String]
          val occupation = map.get("occupation").get.asInstanceOf[String]
          val mobile_phone_num = map.get("mobile_phone_num").get.asInstanceOf[String]
          val fix_phone_num = map.get("fix_phone_num").get.asInstanceOf[String]
          val bank_name = map.get("bank_name").get.asInstanceOf[String]
          val address = map.get("address").get.asInstanceOf[String]
          val marriage = map.get("marriage").get.asInstanceOf[String]
          val child_num = map.get("child_num").get.asInstanceOf[String]

          val tableName = TableName.valueOf("user_info")
          val table = connection.getTable(tableName)
          val put = new Put(Bytes.toBytes(rowkey))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("city"), Bytes.toBytes(city))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("occupation"), Bytes.toBytes(occupation))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile_phone_num"), Bytes.toBytes(mobile_phone_num))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("fix_phone_num"), Bytes.toBytes(fix_phone_num))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("bank_name"), Bytes.toBytes(bank_name))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("marriage"), Bytes.toBytes(marriage))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("child_num"), Bytes.toBytes(child_num))

          Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
          table.close()//分区数据写入HBase后关闭连接
        })
        connection.close()

      })
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

(可左右滑动)

6.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile

mvn clean scala:compile package

7.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务器

将Spark2应用的配置文件放在conf目录下,内容如下:

0293.properties配置文件内容如下:

4.示例运行

1.使用spark2-submit命令向集群提交SparkStreaming作业

spark2-submit --class com.cloudera.streaming.nokerberos.Kafka2Spark2Hbase \
    --master yarn \
    --deploy-mode client \
    --executor-memory 2g \
    --executor-cores 2 \
    --driver-memory 2g \
    --num-executors 2 \
    spark2-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过CM查看作业是否提交成功

Spark2的UI界面

2.运行脚本向Kafka的kafka_hbase_topic生产消息

3.使用hbase shell命令查看数据是否入库成功

5.总结

1.本示例中Spark2Streaming读取非Kerberos环境的Kafka集群,使用的是spark-streaming-kafka0.10.0版本的依赖包,在Spark中提供两个的另外一个版本的为0.8.0版本,在选择依赖包时需要注意与Spark版本的兼容性问题,具体可以参考官网地址:

http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html

2.在/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下需要检查下是否有其它版本的spark-streaming-kafka的依赖包,如果存在需要删除,否则会出现版本冲突问题。

3.Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10

GitHub地址如下:

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/streaming/nokerberos/Kafka2Spark2Hbase.scala

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/utils/HBaseUtil.scala

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/resources/0293.properties

相关阅读:

《Spark2Streaming读Kerberos环境的Kafka并写数据到HBase》

《Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS》

《Spark2Streaming读Kerberos环境的Kafka并写数据到Hive》

《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》

《SparkStreaming读Kafka数据写HBase》

《SparkStreaming读Kafka数据写Kudu》

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-07-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

SparkStreaming读Kafka数据写Kudu

1.9K2
来自专栏Jed的技术阶梯

Kafka 0.8.2.2 Producer报错:java.net.ConnectException: Connection timed out: no further information

8172
来自专栏Albert陈凯

3.4 RDD的计算

3.4 RDD的计算 3.4.1 Ta s k简介 原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为...

36810
来自专栏技术专栏

MapReduce学习笔记

wordcount: 统计文件中每个单词出现的次数需求:1) 文件内容小:shell2)文件内容很大:TB GB ??? 如何解决大数据量的统计分析==> ur...

1282
来自专栏码匠的流水账

聊聊jdbc的batch操作

statement的batch操作,可以批量进行insert或update操作,提升操作性能,特别是在大数据量的insert或update的时候。

1072
来自专栏大数据-Hadoop、Spark

Spark Streaming + Kafka整合

2615
来自专栏Hadoop实操

Spark2Streaming读Kerberos环境的Kafka并写数据到Hive

在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《Spark2Streaming读Kerberos环境的Kafka并写数据到HBas...

1.1K3
来自专栏清墨_iOS分享

iOS开发按时间排序文件

默认的,我们在开发中,写入沙盒里的文件按文件名排序,如0,1,2,3,4,5,a,b,c,.....z等,但有时候咱们需要将里面的文件按创建时间来排序。这时候...

49510
来自专栏Hadoop实操

SparkStreaming读Kafka数据写HBase

1.6K3
来自专栏Hadoop实操

Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu

1K3

扫码关注云+社区

领取腾讯云代金券