Kafka作为优秀的日志采集系统,可以作为Spark Streaming的高级数据源,本文主要介绍如何使用Spark Streaming实时处理Kafka传递过来的数据流。
系统软件
本文实验基于的各软件版本如下:
Java 1.8.0_191
Scala 2.11
hadoop-3.0.3
zookeeper-3.4.10
Spark 2.3.2
kafka_2.12-2.0.1
kafka-manager-1.3.3.17
Kafka集群测试
启动Kafka集群
启动Kafka集群之前,首先启动Zookeeper集群:
在安装有Zookeeper的机器上执行下述命令:
另外打开一个终端,输入以下命令启动Kafka集群:
测试Kafka集群是否可以正常使用:
Kafka脚本测试数据的生成和消费
下面使用Kafka的producer脚本生成一些数据:
另外打开一个终端使用Kafka的consumer脚本消费上述producer脚本生成的数据:
需要注意的是,在旧版本的kafka-console-consumer.sh中,是通过--zookeeper来消费数据的,而新版本的kafka则删除了该方法,统一使用--bootstrap-server,后面跟的是broker-list参数。
基于Spark Streaming实时处理Kafka数据
本实验基于Maven作为项目构建工具,选择的IDE为IntelliJ IDEA 2018.1,采用的编程语言为Scala。
创建Maven工程后,项目处右键Add Frameworks Support,添加Scala语言支持:
首先来编写producer端的代码:
编写代码之前,先构建pom.xml文件,以导入相应Jar依赖,这里选用Maven的maven-scala-plugin插件进行编译,maven-shade-plugin插件进行打包,具体的pom.xml文件如下:
spark-streaming-kafka-0-8_2.11的版本号一定与Scala和Spark版本严格对应,否则后面运行会因为版本API不对应而报错。
导入相应依赖后,开始编写producer端代码--KafkaWordProducer:
上述程序的作用就是每秒钟产生messagesPerSec条消息,每条消息包含wordPerMessage个单词(这里用10以内的随机整数代替单词)。
producer端有了,下面编写consumer端代码--KafkaWordCount:
consumer主要将producer传递过来的消息执行WordCount操作。
LoggerPro的目的是设置日志的打印级别,从而让结果输出的更为清晰,避免被大量的打印信息淹没。
最终的项目结构如下图所示:
打包、提交集群运行
Maven打包
提交服务器
将项目target目录下生成的可执行Jar包上传到服务器指定目录,这里我上传到/usr/software/spark/mycode/streaming。
为了直观地观察到数据流的流转,启动Kafka Manager:
运行
首先启动producer端:
新打开一个终端,启动consumer端:
可以看到,Spark Streaming在实时消费Kafka传过来的数据。
同时,查看Kafka Manger也可以看到数据在实时地产生和消费。
欢迎您扫一扫上面的二维码,关注我的微信公众号!
更多内容请访问http://ruanshubin.top.
领取专属 10元无门槛券
私享最新 技术干货