Github地址
https://github.com/holdbelief/spark/tree/master/SparkStreaming/SparkStreamingExamples/SparkStreaming_Kafka/Direct
整体架构
并行度问题:
1、linesDStram里面封装到的是RDD,RDD里面有partition与这个topic的parititon数是一致的。
2、从kafka中读来的数据封装一个DStram里面,可以对这个DStream重分区 reaprtitions(numpartition)
代码解析
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
/**
* 并行度问题:
* 1、linesDStram里面封装到的是RDD, RDD里面有partition与这个topic的parititon数是一致的。
* 2、从kafka中读来的数据封装一个DStram里面,可以对这个DStream重分区 reaprtitions(numpartition)
* @author faith
*/
public class SparkStreamingOnKafkaDirected {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
/*
* Receiver模式下,需要使用一(数量可以设置,例如N个)个线程执行ReceiverTask,所以
* setMaster("local[2]")至少要是2个(N+1个)线程。
* 而Direct模式下,由于SparkStreaming直接从Kafka中取数据,没有ReceiverTask,所以最少可以设置1个线程
* 当在Eclipse中运行的时候,去掉setMaster("local[1]")的注释,在yarn或者Standalone中运行的时候,注释掉
*/
// .setMaster("local[1]")
.setAppName("SparkStreamingOnKafkaDirected");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
/*
* 第一个泛型是Kafka消息偏移量的数据类型
* 第二个泛型是Kafka消息体的数据类型
*/
Map kafkaParameters = new HashMap();
/*
*/
HashSet topics = new HashSet();
topics.add("Topic1");
JavaPairInputDStream lines = KafkaUtils.createDirectStream(jsc,
String.class, // Kafka消息偏移量的数据类型
String.class, // Kafka消息体的数据类型
StringDecoder.class, // Kafka消息偏移量的反序列化处理类
StringDecoder.class, // Kafka消息的反序列化处理类
kafkaParameters,
topics);
JavaPairDStream lines_repartition = lines.repartition(10);
JavaDStream words = lines_repartition.flatMap(new FlatMapFunction, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable call(Tuple2 tuple) throws Exception {
return Arrays.asList(tuple._2.split(" "));
}
});
JavaPairDStream pairs = words.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2(word, 1);
}
});
JavaPairDStream wordsCount = pairs.reduceByKey(new Function2() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
/*
* 将word_count的内容写入Mysql
*/
wordsCount.foreachRDD(new VoidFunction>() {
@Override
public void call(JavaPairRDD pairRdd) throws Exception {
/*
* foreahPartition与foreach的不同是:
* foreach:每循环一次是一个数据
* foreachPartition:每循环一次是一个Partition,处理一个Partition里面的所有数据,
* 注意VoidFunction的参数是Iterator,也就是每隔Partition里面数据的迭代
* foreachPartition常常用于向数据库例如数据,当使用foreach时候,每次循环是一个数据,那么每个数据
* 就要创建一个数据链连接,foreachPartition是每一个Partition创建一个数据库链接。
*/
pairRdd.foreachPartition(new VoidFunction>>() {
@Override
public void call(Iterator> vs) throws Exception {
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
List insertParams = new ArrayList();
while ( vs.hasNext() ) {
Tuple2 next = vs.next();
insertParams.add(new Object[] );
}
if ( !insertParams.isEmpty() ) {
jdbcWrapper.doBatch("INSERT INTO wordcount VALUES(?, ?)", insertParams);
}
}
});
}
});
jsc.start();
jsc.awaitTermination();
}
}
服务器规划
启动集群
启动ZK集群
分别在faith-openSUSE、faith-Kylin、faith-Mint上执行zkServer.sh start。
启动HDFS集群
在faith-Fedora节点上执行start-dfs.sh
启动Yarn集群
在faith-Kylin节点上执行start-yarn.sh
在faith-Mint节点上执行yarn-daemon.sh start resourcemanager
启动Kafka集群
在faith-openSUSE、faith-Kylin、faith-Mint节点上分别执行下面命令,启动Kafka集群:
运行测试
创建名字为Topic1的Topic
在Console启动Kafka的Producer
启动Producer,并输入一些字符串
如果是在Eclipse中执行,可以直接在控制台看到结果
在Yarn中执行
Client方式提交
Cluster方式提交
数据库展示
填坑
由于本例中使用的Spark版本是1.6.2,与之对应的Scala版本是2.10,在Maven的POM文件中加入SparkStreaming依赖的时候,scala的版本要为2.10,不能是2.11或其他。例如:
org.apache.spark
spark-streaming-kafka_2.10
1.6.2
如果写成2.11,有可能造成一些问题,例如本例中,2.11的时候,当使用yarn-client或者yarn-Client模式下,无法获取BlockManager的问题,程序一直卡在这里,等待获取BlockManager,不继续执行。或者还有可能造成其他各种问题。
总之Spark的版本要和Scala的版本对应上。
领取专属 10元无门槛券
私享最新 技术干货