SparkStreaming&Kafka——Direct方式

Github地址

https://github.com/holdbelief/spark/tree/master/SparkStreaming/SparkStreamingExamples/SparkStreaming_Kafka/Direct

整体架构

并行度问题:

1、linesDStram里面封装到的是RDD,RDD里面有partition与这个topic的parititon数是一致的。

2、从kafka中读来的数据封装一个DStram里面,可以对这个DStream重分区 reaprtitions(numpartition)

代码解析

import java.util.ArrayList;

import java.util.Arrays;

import java.util.HashMap;

import java.util.HashSet;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import kafka.serializer.StringDecoder;

import scala.Tuple2;

/**

* 并行度问题:

* 1、linesDStram里面封装到的是RDD, RDD里面有partition与这个topic的parititon数是一致的。

* 2、从kafka中读来的数据封装一个DStram里面,可以对这个DStream重分区 reaprtitions(numpartition)

* @author faith

*/

public class SparkStreamingOnKafkaDirected {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

/*

* Receiver模式下,需要使用一(数量可以设置,例如N个)个线程执行ReceiverTask,所以

* setMaster("local[2]")至少要是2个(N+1个)线程。

* 而Direct模式下,由于SparkStreaming直接从Kafka中取数据,没有ReceiverTask,所以最少可以设置1个线程

* 当在Eclipse中运行的时候,去掉setMaster("local[1]")的注释,在yarn或者Standalone中运行的时候,注释掉

*/

// .setMaster("local[1]")

.setAppName("SparkStreamingOnKafkaDirected");

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

/*

* 第一个泛型是Kafka消息偏移量的数据类型

* 第二个泛型是Kafka消息体的数据类型

*/

Map kafkaParameters = new HashMap();

/*

*/

HashSet topics = new HashSet();

topics.add("Topic1");

JavaPairInputDStream lines = KafkaUtils.createDirectStream(jsc,

String.class, // Kafka消息偏移量的数据类型

String.class, // Kafka消息体的数据类型

StringDecoder.class, // Kafka消息偏移量的反序列化处理类

StringDecoder.class, // Kafka消息的反序列化处理类

kafkaParameters,

topics);

JavaPairDStream lines_repartition = lines.repartition(10);

JavaDStream words = lines_repartition.flatMap(new FlatMapFunction, String>() {

private static final long serialVersionUID = 1L;

@Override

public Iterable call(Tuple2 tuple) throws Exception {

return Arrays.asList(tuple._2.split(" "));

}

});

JavaPairDStream pairs = words.mapToPair(new PairFunction() {

@Override

public Tuple2 call(String word) throws Exception {

return new Tuple2(word, 1);

}

});

JavaPairDStream wordsCount = pairs.reduceByKey(new Function2() {

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

/*

* 将word_count的内容写入Mysql

*/

wordsCount.foreachRDD(new VoidFunction>() {

@Override

public void call(JavaPairRDD pairRdd) throws Exception {

/*

* foreahPartition与foreach的不同是:

* foreach:每循环一次是一个数据

* foreachPartition:每循环一次是一个Partition,处理一个Partition里面的所有数据,

* 注意VoidFunction的参数是Iterator,也就是每隔Partition里面数据的迭代

* foreachPartition常常用于向数据库例如数据,当使用foreach时候,每次循环是一个数据,那么每个数据

* 就要创建一个数据链连接,foreachPartition是每一个Partition创建一个数据库链接。

*/

pairRdd.foreachPartition(new VoidFunction>>() {

@Override

public void call(Iterator> vs) throws Exception {

JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();

List insertParams = new ArrayList();

while ( vs.hasNext() ) {

Tuple2 next = vs.next();

insertParams.add(new Object[] );

}

if ( !insertParams.isEmpty() ) {

jdbcWrapper.doBatch("INSERT INTO wordcount VALUES(?, ?)", insertParams);

}

}

});

}

});

jsc.start();

jsc.awaitTermination();

}

}

服务器规划

启动集群

启动ZK集群

分别在faith-openSUSE、faith-Kylin、faith-Mint上执行zkServer.sh start。

启动HDFS集群

在faith-Fedora节点上执行start-dfs.sh

启动Yarn集群

在faith-Kylin节点上执行start-yarn.sh

在faith-Mint节点上执行yarn-daemon.sh start resourcemanager

启动Kafka集群

在faith-openSUSE、faith-Kylin、faith-Mint节点上分别执行下面命令,启动Kafka集群:

运行测试

创建名字为Topic1的Topic

在Console启动Kafka的Producer

启动Producer,并输入一些字符串

如果是在Eclipse中执行,可以直接在控制台看到结果

在Yarn中执行

Client方式提交

Cluster方式提交

数据库展示

填坑

由于本例中使用的Spark版本是1.6.2,与之对应的Scala版本是2.10,在Maven的POM文件中加入SparkStreaming依赖的时候,scala的版本要为2.10,不能是2.11或其他。例如:

org.apache.spark

spark-streaming-kafka_2.10

1.6.2

如果写成2.11,有可能造成一些问题,例如本例中,2.11的时候,当使用yarn-client或者yarn-Client模式下,无法获取BlockManager的问题,程序一直卡在这里,等待获取BlockManager,不继续执行。或者还有可能造成其他各种问题。

总之Spark的版本要和Scala的版本对应上。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180313G173RC00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券