Kafka作为优秀的日志采集系统,可以作为Spark Streaming的高级数据源,本文主要介绍如何使用Spark Streaming实时处理Kafka传递过来的数据流。
1 系统软件
本文实验基于的各软件版本如下:
Java 1.8.0_191
Scala 2.11
hadoop-3.0.3
zookeeper-3.4.10
Spark 2.3.2
kafka_2.12-2.0.1
kafka-manager-1.3.3.17
2 具体步骤
2.1 启动Kafka集群
启动Kafka集群之前首先启动Zookeeper集群:
在安装有Zookeeper的机器上执行下述命令:
另外打开一个终端,输入以下命令启动Kafka集群:
测试Kafka集群是否可以正常使用
2.2 Kafka脚本测试数据的生成和消费
下面使用Kafka的producer脚本生成一些数据:
另外打开一个终端使用Kafka的consumer脚本消费上述producer脚本生成的数据:
需要注意的是,在旧版本的kafka-console-consumer.sh中,是通过--zookeeper来消费数据的,而新版本的kafka则删除了该方法,统一使用--bootstrap-server,后面跟的是broker-list参数。
2.3 编写相应程序测试Kafka的数据生产及消费
本实验基于Maven作为项目构建工具,选择的IDE为IntelliJ IDEA 2018.1 ,采用的编程语言为Scala。
创建Maven工程后,项目处右键Add Frameworks Support:
首先,我们来编写producer端的代码:
pom.xml
spark-streaming-kafka-0-8_2.11的版本号一定与Scala和Spark版本严格对应,否则会报错。
KafkaWordProducer.scala
上述程序的作用就是每秒钟产生messagesPerSec条消息,每条消息包含wordPerMessage个单词(这里用10以内的随机整数代替单词)。
数据产生端producer有了,下面我们编写消费端consumer的代码:
KafkaWordCount
消费者主要将生产者传递过来的消息执行WordCount操作:
LoggerPro的目的是设置日志的打印级别,从而让结果输出的更为清晰,避免被大量的打印信息淹没。
最终的项目结构如下图所示:
2.4 打包、提交集群运行
Maven打包
提交服务器
将项目target目录下生成的可执行Jar包上传到服务器指定目录,这里我上传到/usr/software/spark/mycode/streaming。
启动Kafka Manager
为了直观观察到数据流的流转,我们启动Kafka Manager:
运行
首先启动Producer端:
新打开一个终端,启动消费者:
可以看到,Spark Streaming在实时消费Kafka里传过来的数据。
同时,查看Kafka Manger也可以看到数据在实时得产生和消费。
领取专属 10元无门槛券
私享最新 技术干货