Spark Streaming连接Kafka入门教程

转载请务必注明原创地址为:https://dongkelun.com/2018/05/17/sparkKafka/

前言

首先要安装好kafka,这里不做kafka安装的介绍,本文是Spark Streaming入门教程,只是简单的介绍如何利用spark 连接kafka,并消费数据,由于博主也是才学,所以其中代码以实现为主,可能并不是最好的实现方式。

1、对应依赖

根据kafka版本选择对应的依赖,我的kafka版本为0.10.1,spark版本2.2.1,然后在maven仓库找到对应的依赖。

(Kafka项目在版本0.8和0.10之间引入了新的消费者API,因此有两个独立的相应Spark Streaming软件包可用)

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

我用的是sbt,对应的依赖:

"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.1"

2、下载依赖

在命令行执行

sbt eclipse

(我用的是eclipse sbt,具体可看我的其他博客,具体命令根据自己的实际情况)

3、创建topic

创建测试用topic top1

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic top1

4、启动程序

下好依赖之后,根据官方文档提供的示例进行代码测试

下面的代码示例,主要实现spark 连接kafka,并将接收的数据打印出来,没有实现复杂的功能。

package com.dkl.leanring.spark.kafka

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafaDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafaDemo")
    //刷新时间设置为1秒
    val ssc = new StreamingContext(conf, Seconds(1))
    //消费者配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "10.180.29.180:6667", //kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group", //消费者组名
      "auto.offset.reset" -> "latest", //latest自动重置偏移量为最新的偏移量
      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("top1") //消费主题,可以同时消费多个
    //创建DStream,返回接收到的输入数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))
    //打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印
    stream.foreachRDD(f => {
      if (f.count > 0)
        f.foreach(f => println(f.value()))
    })
    ssc.start();
    ssc.awaitTermination();
  }
}

启动上面的程序(本地eclipse启动即可)

需要记住的要点

当在本地运行一个 Spark Streaming 程序的时候,不要使用 "local" 或者 "local1" 作为 master 的 URL 。这两种方法中的任何一个都意味着只有一个线程将用于运行本地任务。如果你正在使用一个基于接收器(receiver)的输入离散流(input DStream)(例如, sockets ,Kafka ,Flume 等),则该单独的线程将用于运行接收器(receiver),而没有留下任何的线程用于处理接收到的数据。因此,在本地运行时,总是用 "localn" 作为 master URL ,其中的 n > 运行接收器的数量。

将逻辑扩展到集群上去运行,分配给 Spark Streaming 应用程序的内核(core)的内核数必须大于接收器(receiver)的数量。否则系统将接收数据,但是无法处理它。

我一开始没有看到官网提醒的这一点,将示例中的local2改为local,现在已经在代码里改回local2了,但是下面的截图没有替换,注意下。

5、发送消息

运行producer

bin/kafka-console-producer.sh --broker-list localhost:6667 --topic top1

然后依次发送下面几个消息

hadoop
spark
kafka
中文测试

6、结果

然后在eclipse console就可以看到对应的数据了。

hadoop
spark
kafka
中文测试

为了直观的展示和理解,附上截图:

发送消息

结果

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏祝威廉

使用Spark SQL构建批处理程序

在批处理模式下,所有的数据源和输出都各自有一个固定的模块(使用了Spark的Datasource API),然后对模块做配置即可,无需使用不同的模块。

1153
来自专栏CSDN技术头条

Hadoop大数据平台运维工程师须掌握的基本命令集分享

本博文收集和整理了在日常维护hadoop集群时运维工程师需要掌握的最基本的hadoop管理与维护的相关命令,在此分享出来供大家参考学习~博主也是刚刚接触hado...

2749
来自专栏Spark学习技巧

Spark调优系列之硬件要求

估计所有的spark开发者都很关心spark的硬件要求。恰当的硬件配置需要具体情况具体分析,浪尖在这里给出以下建议。 一,存储系统 因为因为大多数Spark工作...

2438
来自专栏伦少的博客

spark-submit提交Spark Streamming+Kafka程序

3295
来自专栏Spark学习技巧

Spark的调度系统

一,简介 Spark调度机制可以理解为两个层面的调度。首先,是Spark Application调度。也就是Spark应用程序在集群运行的调度,应用程序包括Dr...

2608
来自专栏维C果糖

详述 IntelliJ IDEA 中自动生成 serialVersionUID 的方法

当我们用 IntelliJ IDEA 编写类并实现 Serializable(序列化)接口的时候,可能会遇到这样一个问题,那就是: 无法自动生成serialVe...

31010
来自专栏Hadoop实操

如何使用Intellij搭建Spark开发环境

在开始Spark学习之前,首先需要搭建Spark的开发环境,可以基于Eclipse或者Intellij等IDE,本文档主要讲述如何使用Intellij搭建Spa...

3724
来自专栏Hadoop实操

使用Hive SQL插入动态分区的Parquet表OOM异常分析

当运行“INSERT ... SELECT”语句向Parquet或者ORC格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行...

1.9K8
来自专栏静默虚空的博客

Yarn 入门

[package]会被加入到package.json文件中的依赖列表,同时yarn.lock也会被更新。

1193
来自专栏大数据智能实战

关于Spark运行流式计算程序中跑一段时间出现GC overhead limit exceeded

最近在升级一个框架的时候,发现某个流式计算程序每隔一定的时间就会出现GC overhead limit exceeded的错误问题。 这个问题肯定是内存不够,但...

2038

扫码关注云+社区

领取腾讯云代金券