首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

SparkStreaming&Kafka——Receiver方式

Github地址

https://github.com/holdbelief/spark/tree/master/SparkStreaming/SparkStreamingExamples/SparkStreaming_Kafka/Receiver

整体架构

执行流程

Kafka生产者向Kafka生产消息

Producer向不同的Broker中生产消息的负载均衡策略有两种:1、轮询的负载均衡策略;2、基于hash的负载均衡策略

SparkStreaming作为Kafka的消费者消费Kafka中的消息

Receiver模式会启动一个ReceiverTask线程,每隔BatchInterval时间间隔从Kafka中订阅的Topic中拉取消息。ReceiverTask也会占用一个线程,这就是为什么设置线程数不能设置为1个线程的原因(如果只有一个线程,那么这个线程会被用来执行ReceiverTask,就没有多余的线程执行应用成了)。如下代码所示:

SparkConf.setMaster("local[1]");

或者

SparkConf.setMaster("local");

都不可以。

ReceiverTask将消息进行持久化

持久化级别是MEMORY_AND_DISK_SER_2,如果有有3个Executor,那么并不是将消息复制为3个数据副本,而是根据持久化级别复制成2个数据副本

ReceiverTask向Driver的ReceiverTracker线程汇报接收到的消息都存放在哪些Executor节点中

ReceiverTask向ZK汇报消费消息的消费偏移量

Driver分发Task到Executor线程池中执行

Driver知道消息都在哪些Executor节点中之后(第3步),分发task到Executor线程池中执行,从而实现了数据本地化(移动计算而不移动数据)

Receiver方式可能造成的数据丢失:

当SparkStreaming的Driver进程挂掉之后,与Driver同属于一个Application的Executor进程也会挂掉,那么如果在第4步之后Driver进程挂掉,Executor也挂掉,那么会造成Executor中的数据丢失,而在第4步中又已经向ZK汇报完了消息的数据偏移量,那么这部分数据无法重新从Kafka中获取。

解决方案就是在上面第2步之后,增加一步,SparkStreaming将从Kafka中接受来的消息再在HDFS中存储一份。(在Executor中根据持久化级别MEMORY_DISK_SER_E存储过2分消息,但是Executor可能会因为Driver的挂掉而挂掉,导致持久化的消息丢失,所以在HDFS中再存储一份消息),这个机制称作WAL机制(WriteAheadLog-预写日志机制)。

WAL描述

预写日志机制WAL(Write Ahead Log)

SparkStreaming作为Kafka的消费者消费Kafka中的消息

ReceiverTask将消息进行持久化

WAL机制

ReceiverTask向Driver的ReceiverTracker线程汇报接收到的消息都存放在哪些Executor节点中

ReceiverTask向ZK汇报消费消息的消费偏移量

Driver分发Task到Executor

预写日志机制的存在的问题是,需要将数据写入HDFS从而在性能上带来的损失。WAL完成之前,无法继续执行下一步ReceiverTask向Driver汇报消息位置,进而拖慢整个系统运行速度,如果WAL时间超过了BatchInterval时间,那么系统会出问题。

packagecom.bjsxt.java.spark.streaming;

importjava.util.Arrays;

importjava.util.HashMap;

importjava.util.Map;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.function.FlatMapFunction;

importorg.apache.spark.api.java.function.Function2;

importorg.apache.spark.api.java.function.PairFunction;

importorg.apache.spark.streaming.Durations;

importorg.apache.spark.streaming.api.java.JavaDStream;

importorg.apache.spark.streaming.api.java.JavaPairDStream;

importorg.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;

importorg.apache.spark.streaming.api.java.JavaStreamingContext;

importorg.apache.spark.streaming.kafka.KafkaUtils;

importscala.Tuple2;

publicclassSparkStreamingOnKafkaReceiver {

publicstaticvoidmain(String[]args) {

/*

* 当在Eclipse中运行的时候,去掉setMaster("local[1]")的注释,在yarn或者Standalone中运行的时候,注释掉setMaster("local[1]")

*/

SparkConfconf =newSparkConf().setAppName("SparkStreamingOnKafkaReceiver").setMaster("local[2]")

.set("spark.streaming.receiver.writeAheadLog.enable","true");//开启WAL路径设置

JavaStreamingContextjsc =null;

try{

jsc =newJavaStreamingContext(conf, Durations.seconds(5));

/*

* WAL路径设置

*/

jsc.checkpoint("hdfs://mycluster/kafka_spark/receiverdata");

MaptopicConsumerConcurrency =newHashMap();

/*

* Map的Key:消费的Topic的名字,本例中Topic1,代表消费Topic1这个Topic Map的Value:启动几个线程去执行Receiver

* Task,本例中1,代表使用1个线程执行Receiver Task

* 如果Value设置为1,那么setMaster("local[2]"),就至少要设置为2个线程,一个线程用于执行Receiver

* Task,另一个线程用于执行业务

* 如果Value设置为2,那么setMaster("local[3]"),就至少要设置为3个线程,两个线程用于执行Receiver

* Task,剩下一个线程用于执行业务

*/

topicConsumerConcurrency.put("Topic1", 1);

JavaPairReceiverInputDStreamlines = KafkaUtils.createStream(jsc,

"faith-openSUSE:2181,faith-Kylin:2181,faith-Mint:2181","MyFirstConsumerGroup",

topicConsumerConcurrency);

/*

* lines是一个kv格式的 k:offset v:values

*/

JavaDStreamwords =lines.flatMap(newFlatMapFunction, String>() {

privatestaticfinallongserialVersionUID= -1955739045858623388L;

@Override

publicIterable call(Tuple2tuple)throwsException {

returnArrays.asList(tuple._2.split("\t"));

}

});

JavaPairDStreampairs =words.mapToPair(newPairFunction() {

privatestaticfinallongserialVersionUID= -4180968474440524871L;

@Override

publicTuple2 call(Stringword)throwsException {

returnnewTuple2(word, 1);

}

});

JavaPairDStreamwordsCount =pairs.reduceByKey(newFunction2() {

privatestaticfinallongserialVersionUID= 5167887209365658964L;

@Override

publicInteger call(Integerv1, Integerv2)throwsException {

returnv1 +v2;

}

});

wordsCount.print();

jsc.start();

}finally{

if(jsc !=null) {

jsc.awaitTermination();

}

}

}

}

启动ZK集群

服务器规划

启动集群

代码详解

代码解析

WAL问题

WAL代码实现

分别在faith-openSUSE、faith-Kylin、faith-Mint上执行zkServer.sh start。

启动HDFS集群

在faith-Fedora节点上执行start-dfs.sh

启动Yarn集群

在faith-Kylin节点上执行start-yarn.sh

在faith-Mint节点上执行yarn-daemon.sh start resourcemanager

启动Kafka集群

在faith-openSUSE、faith-Kylin、faith-Mint节点上分别执行下面命令,启动Kafka集群:

运行测试

创建名字为Topic1的Topic

在Console启动Kafka的Producer

启动Producer,并输入一些字符串

如果是在Eclipse中执行,可以直接在控制台看到结果

在Yarn中执行

Client方式提交

Cluster方式提交

数据库展示

填坑

由于本例中使用的Spark版本是1.6.2,与之对应的Scala版本是2.10,在Maven的POM文件中加入SparkStreaming依赖的时候,scala的版本要为2.10,不能是2.11或其他。例如:

org.apache.spark

spark-streaming-kafka_2.10

1.6.2

如果写成2.11,有可能造成一些问题,例如本例中,2.11的时候,当使用yarn-client或者yarn-Client模式下,无法获取BlockManager的问题,程序一直卡在这里,等待获取BlockManager,不继续执行。或者还有可能造成其他各种问题。

总之Spark的版本要和Scala的版本对应上。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180313G1749E00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券