基于Spark Streaming实时处理Kafka数据流

Kafka作为优秀的日志采集系统,可以作为Spark Streaming的高级数据源,本文主要介绍如何使用Spark Streaming实时处理Kafka传递过来的数据流。

系统软件

本文实验基于的各软件版本如下:

Java 1.8.0_191

Scala 2.11

hadoop-3.0.3

zookeeper-3.4.10

Spark 2.3.2

kafka_2.12-2.0.1

kafka-manager-1.3.3.17

Kafka集群测试

启动Kafka集群

启动Kafka集群之前,首先启动Zookeeper集群

在安装有Zookeeper的机器上执行下述命令:

另外打开一个终端,输入以下命令启动Kafka集群:

测试Kafka集群是否可以正常使用:

Kafka脚本测试数据的生成和消费

下面使用Kafka的producer脚本生成一些数据:

另外打开一个终端使用Kafka的consumer脚本消费上述producer脚本生成的数据:

需要注意的是,在旧版本的kafka-console-consumer.sh中,是通过--zookeeper来消费数据的,而新版本的kafka则删除了该方法,统一使用--bootstrap-server,后面跟的是broker-list参数。

基于Spark Streaming实时处理Kafka数据

本实验基于Maven作为项目构建工具,选择的IDE为IntelliJ IDEA 2018.1,采用的编程语言为Scala

创建Maven工程后,项目处右键Add Frameworks Support,添加Scala语言支持:

首先来编写producer端的代码:

编写代码之前,先构建pom.xml文件,以导入相应Jar依赖,这里选用Maven的maven-scala-plugin插件进行编译,maven-shade-plugin插件进行打包,具体的pom.xml文件如下:

spark-streaming-kafka-0-8_2.11的版本号一定与Scala和Spark版本严格对应,否则后面运行会因为版本API不对应而报错。

导入相应依赖后,开始编写producer端代码--KafkaWordProducer:

上述程序的作用就是每秒钟产生messagesPerSec条消息,每条消息包含wordPerMessage个单词(这里用10以内的随机整数代替单词)。

producer端有了,下面编写consumer端代码--KafkaWordCount:

consumer主要将producer传递过来的消息执行WordCount操作。

LoggerPro的目的是设置日志的打印级别,从而让结果输出的更为清晰,避免被大量的打印信息淹没。

最终的项目结构如下图所示:

打包、提交集群运行

Maven打包

提交服务器

将项目target目录下生成的可执行Jar包上传到服务器指定目录,这里我上传到/usr/software/spark/mycode/streaming

为了直观地观察到数据流的流转,启动Kafka Manager

运行

首先启动producer端

新打开一个终端,启动consumer端

可以看到,Spark Streaming在实时消费Kafka传过来的数据。

同时,查看Kafka Manger也可以看到数据在实时地产生和消费。

欢迎您扫一扫上面的二维码,关注我的微信公众号!

更多内容请访问http://ruanshubin.top.

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181209G12FJC00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券