Github地址
https://github.com/holdbelief/spark/tree/master/SparkStreaming/SparkStreamingExamples/SparkStreaming_Kafka/Receiver
整体架构
执行流程
Kafka生产者向Kafka生产消息
Producer向不同的Broker中生产消息的负载均衡策略有两种:1、轮询的负载均衡策略;2、基于hash的负载均衡策略
SparkStreaming作为Kafka的消费者消费Kafka中的消息
Receiver模式会启动一个ReceiverTask线程,每隔BatchInterval时间间隔从Kafka中订阅的Topic中拉取消息。ReceiverTask也会占用一个线程,这就是为什么设置线程数不能设置为1个线程的原因(如果只有一个线程,那么这个线程会被用来执行ReceiverTask,就没有多余的线程执行应用成了)。如下代码所示:
SparkConf.setMaster("local[1]");
或者
SparkConf.setMaster("local");
都不可以。
ReceiverTask将消息进行持久化
持久化级别是MEMORY_AND_DISK_SER_2,如果有有3个Executor,那么并不是将消息复制为3个数据副本,而是根据持久化级别复制成2个数据副本
ReceiverTask向Driver的ReceiverTracker线程汇报接收到的消息都存放在哪些Executor节点中
ReceiverTask向ZK汇报消费消息的消费偏移量
Driver分发Task到Executor线程池中执行
Driver知道消息都在哪些Executor节点中之后(第3步),分发task到Executor线程池中执行,从而实现了数据本地化(移动计算而不移动数据)
Receiver方式可能造成的数据丢失:
当SparkStreaming的Driver进程挂掉之后,与Driver同属于一个Application的Executor进程也会挂掉,那么如果在第4步之后Driver进程挂掉,Executor也挂掉,那么会造成Executor中的数据丢失,而在第4步中又已经向ZK汇报完了消息的数据偏移量,那么这部分数据无法重新从Kafka中获取。
解决方案就是在上面第2步之后,增加一步,SparkStreaming将从Kafka中接受来的消息再在HDFS中存储一份。(在Executor中根据持久化级别MEMORY_DISK_SER_E存储过2分消息,但是Executor可能会因为Driver的挂掉而挂掉,导致持久化的消息丢失,所以在HDFS中再存储一份消息),这个机制称作WAL机制(WriteAheadLog-预写日志机制)。
WAL描述
预写日志机制WAL(Write Ahead Log)
SparkStreaming作为Kafka的消费者消费Kafka中的消息
ReceiverTask将消息进行持久化
WAL机制
ReceiverTask向Driver的ReceiverTracker线程汇报接收到的消息都存放在哪些Executor节点中
ReceiverTask向ZK汇报消费消息的消费偏移量
Driver分发Task到Executor
预写日志机制的存在的问题是,需要将数据写入HDFS从而在性能上带来的损失。WAL完成之前,无法继续执行下一步ReceiverTask向Driver汇报消息位置,进而拖慢整个系统运行速度,如果WAL时间超过了BatchInterval时间,那么系统会出问题。
packagecom.bjsxt.java.spark.streaming;
importjava.util.Arrays;
importjava.util.HashMap;
importjava.util.Map;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.function.FlatMapFunction;
importorg.apache.spark.api.java.function.Function2;
importorg.apache.spark.api.java.function.PairFunction;
importorg.apache.spark.streaming.Durations;
importorg.apache.spark.streaming.api.java.JavaDStream;
importorg.apache.spark.streaming.api.java.JavaPairDStream;
importorg.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
importorg.apache.spark.streaming.api.java.JavaStreamingContext;
importorg.apache.spark.streaming.kafka.KafkaUtils;
importscala.Tuple2;
publicclassSparkStreamingOnKafkaReceiver {
publicstaticvoidmain(String[]args) {
/*
* 当在Eclipse中运行的时候,去掉setMaster("local[1]")的注释,在yarn或者Standalone中运行的时候,注释掉setMaster("local[1]")
*/
SparkConfconf =newSparkConf().setAppName("SparkStreamingOnKafkaReceiver").setMaster("local[2]")
.set("spark.streaming.receiver.writeAheadLog.enable","true");//开启WAL路径设置
JavaStreamingContextjsc =null;
try{
jsc =newJavaStreamingContext(conf, Durations.seconds(5));
/*
* WAL路径设置
*/
jsc.checkpoint("hdfs://mycluster/kafka_spark/receiverdata");
MaptopicConsumerConcurrency =newHashMap();
/*
* Map的Key:消费的Topic的名字,本例中Topic1,代表消费Topic1这个Topic Map的Value:启动几个线程去执行Receiver
* Task,本例中1,代表使用1个线程执行Receiver Task
* 如果Value设置为1,那么setMaster("local[2]"),就至少要设置为2个线程,一个线程用于执行Receiver
* Task,另一个线程用于执行业务
* 如果Value设置为2,那么setMaster("local[3]"),就至少要设置为3个线程,两个线程用于执行Receiver
* Task,剩下一个线程用于执行业务
*/
topicConsumerConcurrency.put("Topic1", 1);
JavaPairReceiverInputDStreamlines = KafkaUtils.createStream(jsc,
"faith-openSUSE:2181,faith-Kylin:2181,faith-Mint:2181","MyFirstConsumerGroup",
topicConsumerConcurrency);
/*
* lines是一个kv格式的 k:offset v:values
*/
JavaDStreamwords =lines.flatMap(newFlatMapFunction, String>() {
privatestaticfinallongserialVersionUID= -1955739045858623388L;
@Override
publicIterable call(Tuple2tuple)throwsException {
returnArrays.asList(tuple._2.split("\t"));
}
});
JavaPairDStreampairs =words.mapToPair(newPairFunction() {
privatestaticfinallongserialVersionUID= -4180968474440524871L;
@Override
publicTuple2 call(Stringword)throwsException {
returnnewTuple2(word, 1);
}
});
JavaPairDStreamwordsCount =pairs.reduceByKey(newFunction2() {
privatestaticfinallongserialVersionUID= 5167887209365658964L;
@Override
publicInteger call(Integerv1, Integerv2)throwsException {
returnv1 +v2;
}
});
wordsCount.print();
jsc.start();
}finally{
if(jsc !=null) {
jsc.awaitTermination();
}
}
}
}
启动ZK集群
服务器规划
启动集群
代码详解
代码解析
WAL问题
WAL代码实现
分别在faith-openSUSE、faith-Kylin、faith-Mint上执行zkServer.sh start。
启动HDFS集群
在faith-Fedora节点上执行start-dfs.sh
启动Yarn集群
在faith-Kylin节点上执行start-yarn.sh
在faith-Mint节点上执行yarn-daemon.sh start resourcemanager
启动Kafka集群
在faith-openSUSE、faith-Kylin、faith-Mint节点上分别执行下面命令,启动Kafka集群:
运行测试
创建名字为Topic1的Topic
在Console启动Kafka的Producer
启动Producer,并输入一些字符串
如果是在Eclipse中执行,可以直接在控制台看到结果
在Yarn中执行
Client方式提交
Cluster方式提交
数据库展示
填坑
由于本例中使用的Spark版本是1.6.2,与之对应的Scala版本是2.10,在Maven的POM文件中加入SparkStreaming依赖的时候,scala的版本要为2.10,不能是2.11或其他。例如:
org.apache.spark
spark-streaming-kafka_2.10
1.6.2
如果写成2.11,有可能造成一些问题,例如本例中,2.11的时候,当使用yarn-client或者yarn-Client模式下,无法获取BlockManager的问题,程序一直卡在这里,等待获取BlockManager,不继续执行。或者还有可能造成其他各种问题。
总之Spark的版本要和Scala的版本对应上。
领取专属 10元无门槛券
私享最新 技术干货