Spark Streaming是一个可扩展,高吞吐,容错能力强的实时流式处理处理系统。一般的系统架构图是,数据从一个源点,经过Sparing Streaming处理,最后汇聚到一个系统。Spark Streaming的数据来源可以非常丰富,比如Kafka, Flume, Twitter, ZeroMQ, Kinesis 或者是任何的TCP sockets程序。对于数据的处理,Spark Streaming提供了非常丰富的高级api,例如map,redue,joini和窗口函数等等。数据处理完成后,可以存储到其他地方,比如文件系统,对象存储,数据库。典型的数据处理流程图:
RDD:Resilient Distributed Datasets,弹性分部署数据集,支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;动作(actions)在数据集上运行计算后,返回一个值给驱动程序。在这里简单理解为某个时间片的数据集合即可。
DStream:和RDD概念有点类似,是RDD的集合,代表着整个数据流。简单来说Spark Streaming中的数据量就是DStream,然后每个时间片的数据就是RDD。
Kafka与Spark Streaming整合,首先需要从Kafka读取数据过来,读取数据有两种方式
下面使用一个示例,展示如何整合Kafka和Spark Streaming,这个例子中,使用一个生产者不断往Kafka随机发送数字,然后通过Spark Streaming统计时间片段内数字之和。
1.往kafka随机发送数字代码:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "spark-producer-demo-client");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String,String> producer = new KafkaProducer<>(properties);
Random random = new Random();
while (true) {
int value = random.nextInt(10);
ProducerRecord<String, String> message =
new ProducerRecord<>(topic, value+"");
producer.send(message, (recordMetadata, e) -> {
if (recordMetadata != null) {
System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" +
recordMetadata.offset());
}
});
TimeUnit.SECONDS.sleep(1);
2.提交到spark集群代码,用于统计2秒时间间隔的数字之和,这里我们使用的是Direct模式:
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]")
.setAppName("StreamingWithKafka")
val ssc = new StreamingContext(sparkConf, Seconds(2)) // 1
ssc.checkpoint(checkpointDir)
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, // 2
Subscribe[String, String](List(topic), kafkaParams))
val value = stream.map(record => {
val intVal = Integer.valueOf(record.value())
println(intVal)
intVal
}).reduce(_+_)
value.print()
ssc.start
ssc.awaitTermination
}
上面代码中1处的代码表示聚合处理的时间分片为2秒,计算两秒内的随机数之和。2处的代码用于指定spark执行器上面的kafka consumer分区分配策略,一共有三种类型,PreferConsistent是最常用的,表示订阅主题的分区均匀分配到执行器上面,然后还有PreferBrokers,这种机制是优先分配到和broker相同机器的执行器上,还有一种是PreferFixed,这种是手动配置,用的比较少。上面程序每次计算2秒时间间隔内的数字之和,输入会类似如下:
3
4
...
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。