Spark2Streaming读Kerberos环境的Kafka并写数据到Hive

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的

在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《Spark2Streaming读Kerberos环境的Kafka并写数据到HBase》和《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》,本篇文章Fayson主要介绍如何使用Spark2Streaming访问Kerberos环境的Kafka并将接收到的Kafka数据写入Hive.

示例架构图如下:

示例详细流程图如下:

  • 内容概述:

1.环境准备

2.Spark2Streaming示例开发

3.示例运行

4.总结

  • 测试环境:

1.CM5.14.3/CDH5.14.2

2.CDK2.2.0(Apache Kafka0.10.2)

3.SPARK2.2.0

4.操作系统版本为Redhat7.3

5.采用root用户进行操作

6.集群已启用Kerberos

2.环境准备

1.准备访问Kafka的Keytab文件,使用xst命令导出keytab文件

[root@cdh01 ~]# kadmin.local 
kadmin.local:  xst -norandkey -k fayson.keytab fayson@FAYSON.COM

(可左右滑动)

使用klist命令检查导出的keytab文件是否正确

[root@cdh01 ~]# klist -ek fayson.keytab

(可左右滑动)

2.导出一个hive/adin@FAYSON.COM账号的keytab文件,该keytab用户向Yarn提交Spark作业使用

[root@cdh01 ~]# kadmin.local 
kadmin.local:  xst -norandkey -k hive.keytab hive/admin@FAYSON.COM

(可左右滑动)

3.准备jaas.cof文件内容如下:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
  principal="fayson@FAYSON.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
  principal="fayson@FAYSON.COM";
};

(可左右滑动)

将fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下。

4.准备向Kerberos环境发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0286-kafka-shell

(可左右滑动)

根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:

{
   "occupation": "生产工作、运输工作和部分体力劳动者",
   "address": "台东东二路16号-8-8",
   "city": "长治",
   "marriage": "1",
   "sex": "1",
   "name": "仲淑兰",
   "mobile_phone_num": "13607268580",
   "bank_name": "广州银行31",
   "id": "510105197906185179",
   "child_num": "1",
   "fix_phone_num": "15004170180"
}

(可左右滑动)

5.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10

3.SparkStreaming示例开发

1.使用maven创建scala语言的spark2demo工程,pom.xml依赖如下

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>

(可左右滑动)

2.在resources下创建0291.properties配置文件,内容如下:

kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
kafka.topics=kafka_hive_topic

(可左右滑动)

3.创建Kafka2Spark2Hive.scala文件,内容如下:

package com.cloudera.streaming

import java.io.{File, FileInputStream}
import java.util.Properties

import org.apache.commons.lang.StringUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import scala.collection.JavaConverters._
import scala.util.parsing.json.JSON

/**
  * package: com.cloudera.streaming
  * describe: Kerberos环境中Spark2Streaming应用实时读取Kafka数据,解析后存入Hive
  * 使用spark2-submit的方式提交作业
  * spark2-submit --class com.cloudera.streaming.Kafka2Spark2Hive \
   --master yarn \
   --deploy-mode client \
   --executor-memory 2g \
   --executor-cores 2 \
   --driver-memory 2g \
   --num-executors 2 \
   --queue default  \
   --principal hive/admin@FAYSON.COM \
   --keytab /data/disk1/spark2streaming-kafka-hive/conf/hive.keytab \
   --files "/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf#jaas.conf" \
   --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \
   --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \
   spark2-demo-1.0-SNAPSHOT.jar
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/7/15
  * creat_time: 下午4:01
  * 公众号:Hadoop实操
  */
object Kafka2Spark2Hive {

  Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别

  var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0291.properties"

  /**
    * 建表Schema定义
    */
  val userInfoSchema = StructType(
    //         col name   type     nullable?
    StructField("id", StringType , false) ::
      StructField("name" , StringType, true ) ::
      StructField("sex" , StringType, true ) ::
      StructField("city" , StringType, true ) ::
      StructField("occupation" , StringType, true ) ::
      StructField("tel" , StringType, true ) ::
      StructField("fixPhoneNum" , StringType, true ) ::
      StructField("bankName" , StringType, true ) ::
      StructField("address" , StringType, true ) ::
      StructField("marriage" , StringType, true ) ::
      StructField("childNum", StringType , true ) :: Nil
  )

  /**
    * 定义一个UserInfo对象
    */
  case class UserInfo (
                        id: String,
                        name: String,
                        sex: String,
                        city: String,
                        occupation: String,
                        tel: String,
                        fixPhoneNum: String,
                        bankName: String,
                        address: String,
                        marriage: String,
                        childNum: String
                      )

  def main(args: Array[String]): Unit = {
    //加载配置文件
    val properties = new Properties()
    val file = new File(confPath)
    if(!file.exists()) {
      System.out.println(Kafka2Spark2Hive.getClass.getClassLoader.getResource("0291.properties"))
      val in = Kafka2Spark2Hive.getClass.getClassLoader.getResourceAsStream("0291.properties")
      properties.load(in);
    } else {
      properties.load(new FileInputStream(confPath))
    }

    val brokers = properties.getProperty("kafka.brokers")
    val topics = properties.getProperty("kafka.topics")
    println("kafka.brokers:" + brokers)
    println("kafka.topics:" + topics)

    if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics)) {
      println("未配置Kafka信息...")
      System.exit(0)
    }
    val topicsSet = topics.split(",").toSet

    val spark = SparkSession.builder().appName("Kafka2Spark2Hive-kerberos").config(new SparkConf()).getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
      , "auto.offset.reset" -> "latest"
      , "security.protocol" -> "SASL_PLAINTEXT"
      , "sasl.kerberos.service.name" -> "kafka"
      , "key.deserializer" -> classOf[StringDeserializer]
      , "value.deserializer" -> classOf[StringDeserializer]
      , "group.id" -> "testgroup"
    )

    val dStream = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    //引入隐式
    import spark.implicits._

    dStream.foreachRDD(rdd => {
      //将rdd数据重新封装为Rdd[UserInfo]
      val newrdd = rdd.map(line => {
        val jsonObj =  JSON.parseFull(line.value())
        val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
        new UserInfo(
          map.get("id").get.asInstanceOf[String],
          map.get("name").get.asInstanceOf[String],
          map.get("sex").get.asInstanceOf[String],
          map.get("city").get.asInstanceOf[String],
          map.get("occupation").get.asInstanceOf[String],
          map.get("mobile_phone_num").get.asInstanceOf[String],
          map.get("fix_phone_num").get.asInstanceOf[String],
          map.get("bank_name").get.asInstanceOf[String],
          map.get("address").get.asInstanceOf[String],
          map.get("marriage").get.asInstanceOf[String],
          map.get("child_num").get.asInstanceOf[String]
        )
      })
      //将RDD转换为DataFrame
      val userinfoDF = spark.sqlContext.createDataFrame(newrdd)

      userinfoDF.write.mode(SaveMode.Append).saveAsTable("ods_user")
    })
    ssc.start()
    ssc.awaitTermination()
  }

}

(可左右滑动)

4.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile

mvn clean scala:compile package

(可左右滑动)

5.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务

在conf目录下新增0291.properties配置文件,内容如下:

4.示例运行


1.使用spark2-submit命令向集群提交SparkStreaming作业

spark2-submit --class com.cloudera.streaming.Kafka2Spark2Hive \
 --master yarn \
 --deploy-mode client \
 --executor-memory 2g \
 --executor-cores 2 \
 --driver-memory 2g \
 --num-executors 2 \
 --queue default  \
 --principal hive/admin@FAYSON.COM \
 --keytab /data/disk1/spark2streaming-kafka-hive/conf/hive.keytab \
 --files "/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf#jaas.conf" \
 --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \
 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hive/conf/jaas.conf" \
 spark2-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过CM查看作业是否提交成功

Spark2的UI界面

2.运行脚本向Kafka的Kafka_kudu_topic生产消息

3.登录Hue在Hive中执行Select查询user_info表中数据

5.总结

1.在前面的文章Fayson也有介绍Java访问Kerberos环境的Kafka,需要使用到jaas.conf文件,这里的jaas.conf文件Fayson通过spark2-submit的方式指定,注意我们的jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的。

2.同样在scala代码中访问Kafka是也一样需要添加Kerberos相关的配置security.protocol和sasl.kerberos.service.name参数。

3.Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10

4.在文章中将接收到的Kafka数据转换成DataFrame,调用DataFrame的saveAsTable将数据保存到Hive的表中,如果Hive表不存在会默认的创建。

GitHub地址如下:

https://github.com/fayson/cdhproject/tree/master/spark2demo/spark2streaming-kafka-hive

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/streaming/Kafka2Spark2Hive.scala

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-07-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大数据-Hadoop、Spark

Spark Streaming + Kafka整合

2755
来自专栏行者悟空

Spark RDD的Action

1556
来自专栏个人分享

Hive架构及Hive On Spark

(1)Table:每个表都对应在HDFS中的目录下,数据是经过序列化后存储在该目录中。同时Hive也支持表中的数据存储在其他类型的文件系统中,如NFS或本地文件...

4452
来自专栏码匠的流水账

聊聊jdbc的batch操作

statement的batch操作,可以批量进行insert或update操作,提升操作性能,特别是在大数据量的insert或update的时候。

1182
来自专栏祝威廉

Spark 2.0 Structured Streaming 分析

Spark 2.0 将流式计算也统一到DataFrame里去了,提出了Structured Streaming的概念,将数据源映射为一张无线长度的表,同时将流式...

1503
来自专栏岑玉海

Spark Streaming编程指南

Overview Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 它可以接受来自Kafka, Flume,...

7245
来自专栏Hadoop实操

Spark2Streaming读非Kerberos环境的Kafka并写数据到Kudu

在前面的文章Fayson介绍了在Kerberos环境下《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》,本篇文章Fayso...

3911
来自专栏个人分享

Shuffle相关分析

 Shuffle描述是一个过程,表现出的是多对多的依赖关系。Shuffle是连接map阶段和Reduce阶段的纽带,每个Reduce Task都会从Map Ta...

1074
来自专栏美图数据技术团队

RDD原理与基本操作 | Spark,从入门到精通

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,...

2.6K2
来自专栏斑斓

大数据 | 理解Spark的核心RDD

与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streami...

3989

扫码关注云+社区

领取腾讯云代金券