首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

使用Spark Streaming处理Kafka数据流

Kafka作为优秀的日志采集系统,可以作为Spark Streaming的高级数据源,本文主要介绍如何使用Spark Streaming实时处理Kafka传递过来的数据流。

1 系统软件

本文实验基于的各软件版本如下:

Java 1.8.0_191

Scala 2.11

hadoop-3.0.3

zookeeper-3.4.10

Spark 2.3.2

kafka_2.12-2.0.1

kafka-manager-1.3.3.17

2 具体步骤

2.1 启动Kafka集群

启动Kafka集群之前首先启动Zookeeper集群:

在安装有Zookeeper的机器上执行下述命令:

另外打开一个终端,输入以下命令启动Kafka集群:

测试Kafka集群是否可以正常使用

2.2 Kafka脚本测试数据的生成和消费

下面使用Kafka的producer脚本生成一些数据:

另外打开一个终端使用Kafka的consumer脚本消费上述producer脚本生成的数据:

需要注意的是,在旧版本的kafka-console-consumer.sh中,是通过--zookeeper来消费数据的,而新版本的kafka则删除了该方法,统一使用--bootstrap-server,后面跟的是broker-list参数。

2.3 编写相应程序测试Kafka的数据生产及消费

本实验基于Maven作为项目构建工具,选择的IDE为IntelliJ IDEA 2018.1 ,采用的编程语言为Scala。

创建Maven工程后,项目处右键Add Frameworks Support:

首先,我们来编写producer端的代码:

pom.xml

spark-streaming-kafka-0-8_2.11的版本号一定与Scala和Spark版本严格对应,否则会报错。

KafkaWordProducer.scala

上述程序的作用就是每秒钟产生messagesPerSec条消息,每条消息包含wordPerMessage个单词(这里用10以内的随机整数代替单词)。

数据产生端producer有了,下面我们编写消费端consumer的代码:

KafkaWordCount

消费者主要将生产者传递过来的消息执行WordCount操作:

LoggerPro的目的是设置日志的打印级别,从而让结果输出的更为清晰,避免被大量的打印信息淹没。

最终的项目结构如下图所示:

2.4 打包、提交集群运行

Maven打包

提交服务器

将项目target目录下生成的可执行Jar包上传到服务器指定目录,这里我上传到/usr/software/spark/mycode/streaming。

启动Kafka Manager

为了直观观察到数据流的流转,我们启动Kafka Manager:

运行

首先启动Producer端:

新打开一个终端,启动消费者:

可以看到,Spark Streaming在实时消费Kafka里传过来的数据。

同时,查看Kafka Manger也可以看到数据在实时得产生和消费。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181208G11I9B00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券