Spark Streaming + Kafka整合

参考官网 http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html

  • 之前先确保以下操作: 1、先启动ZK:./zkServer.sh start 2、启动Kafka:./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 3、创建topic: ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic ./kafka-topics.sh --list --zookeeper hadoop:2181 4、通过控制台测试是否能正常生产与消费 ./kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_streaming_topic ./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic kafka_streaming_topic

Approach 1: Receiver-based Approach

  • Receiver方式的本地环境联调 1、KafkaUtils.createStream Create an input stream that pulls messages from Kafka Brokers.
import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

2、引入数组,含四个数->val Array(zkQuorum,group,topics,numThreads) = args

3、判断是否传入四个参数->构建topicMap: val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

4、topicMap带入KafkaUtils参数 5、业务代码: messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

6、到IDEA的edit configuration编辑以下内容: hadoop:2181 test kafka_streaming_topic 1

注意:

test:group名
1:线程数
setMaster("local[2]")   一定要大于2

7、run下代码,在kafka 生产者窗口手动输入几个单词,在kafka consumer窗口即时看到单词的产生,在本地代码的console窗口看到单词计数

  • Receiver方式的生产环境联调 1、在项目根目录下执行编译 mvn clean package -DskipTests 2、上传到服务器hadoop的lib目录下,执行:
spark-submit \
--class com.feiyue.bigdata.sparkstreaming.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:2181 test kafka_streaming_topic 1

3、运行后看4040端口Spark Streaming的UI界面

可以知道UI页面中,
Receiver是一直都在运作的,
而Direct方式没有此Jobs

Approach 2: Direct Approach (No Receivers)

Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

特点: 1、简化了并行度,不需要多个Input Stream,只需要一个DStream 2、加强了性能,真正做到了0数据丢失,而Receiver方式需要写到WAL才可以(即副本存储),Direct方式没有Receiver 3、只执行一次

缺点: 1、基于ZooKeeper的Kafka监控工具,无法展示出来,所以需要周期性地访问offset才能更新到ZooKeeper

  • 怎么做

基于Receiver方式的代码,将createStream改为createDirectStream,其余业务代码都不用改动。

    //kafkaParams: Map[String, String],
    //topics: Set[String]
    val Array(brokers, topics) = args


    //val sparkConf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
    val sparkConf = new SparkConf()

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsSet = topics.split(",").toSet

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  • Direct生产环境联调 基于Receiver方式,参数只需要传brokers与topics,注意查看源码与泛型看返回类型并构造出来
spark-submit \
--class com.feiyue.bigdata.sparkstreaming.KafkaDirectWordCount \
--master local[2] \
--name KafkaDirectWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:9092  kafka_streaming_topic

3、运行后看4040端口Spark Streaming的UI界面

可以知道UI页面中,Direct方式没有此Jobs

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏开发技术

从源码来理解slf4j的绑定,以及logback对配置文件的加载

  项目中的日志系统使用的是slf4j + logback。slf4j作为一个简单日志门面,为各种loging APIs(像java.util.logging,...

1444
来自专栏技术专栏

Springcloud-hystrix断路器实现springcloud(慕课网廖师兄SpringCloud微服务实战)

当服务可用,会走默认的逻辑访问cityClient 当服务不可用,会调用defaultCities方法

1652
来自专栏好好学java的技术栈

SpringMVC+RestFul详细示例实战教程一(实现跨域访问+postman测试)

注意:由于文章篇幅太长,超出了字数,这是文章的第一部分,明天分享文章的第二部分,请见谅!

2202
来自专栏肖力涛的专栏

Spark踩坑记:Spark Streaming+kafka应用及调优

本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka 在舆情项目中的应用,最后将自己...

4K2
来自专栏一个会写诗的程序员的博客

《Spring Boot极简教程》第7章 Spring Boot集成模板引擎

其实,没有任何一个模板引擎(jsp,velocity,thymeleaf,freemarker,etc)可以完全实现MVC绝对的分层,只有“自由度”上的界定罢了...

693
来自专栏张善友的专栏

ServiceStack.Redis 使用教程

环境准备 Redis (使用Windows版本做测试,运营环境建议使用Linux版本) ServiceStack.Redis-v3.00 在Windows上运行...

2625
来自专栏Albert陈凯

2018-11-17 面试必问问题TransactionalJava事务之一——Java事务的基本问题

关于加@Transactional注解的方法之间调用,事务是否生效的问题 https://blog.csdn.net/blacktal/article/det...

182
来自专栏Java成神之路

分布式_事务_02_2PC框架raincat源码解析

上一节已经将raincat demo工程运行起来了,这一节来分析下raincat的源码

1061
来自专栏跟着阿笨一起玩NET

ASP.NET MVC中使用FluentValidation验证实体

本文转载:http://www.cnblogs.com/libingql/p/3801704.html

381
来自专栏chenssy

【追光者系列】HikariCP源码分析之故障检测那些思考 fail fast & allowPoolSuspension

由于时间原因,本文主要内容参考了 https://segmentfault.com/a/1190000013136251,并结合一些思考做了增注。

1432

扫码关注云+社区