专栏首页about云spark streaming知识总结[优化]

spark streaming知识总结[优化]

问题导读 1.DStreams的含义是什么? 2.DStreams提供哪两种类型的操作? 3.Transformations操作分为哪两种类型? 4.本文说了哪些输入源? 5.什么是batch? 本篇做了一些细节优化,防止初学者在看到的时候,造成误解.如有问题,欢迎交流 RDD与job之间的关系 Spark Streaming是构建在Spark上的实时流计算框架,扩展了Spark流式大数据处理能 力。Spark Streaming将数据流以时间片为单位分割形成RDD,使用RDD操作处理每一块数 据,每块数据(也就是RDD)都会生成一个Spark Job进行处理,最终以批处理的方式处理 每个时间片的数据

说明:Spark中的Job和MR中Job不一样不一样。MR中Job主要是Map或者Reduce Job。而Spark的Job其实很好区别,RDD一个action算子就算一个Job.

什么是batch Spark Streaming生成新的batch并对它进行一些处理,每个batch中的数据都代表一个RDD 理解batch 间隔时间开始会创建,间隔时间内会积累 设置时间间隔的理解 我们知道spark streaming有个时间间隔。假如间隔为1秒,它是停下1秒,然后在接受1秒的数据,也就是说是间隔1秒,然后在接受1秒数据,还是说接受1秒的数据。这里表面上没有太大的区别,其实在于理解的到不到位。说白了batch封装的是1秒的数据。

batch创建 batch在时间间隔开始被创建,在间隔时间内任何到达的数据都被添加到批数据中,间隔时间结束,batch创建结束。 什么是batch间隔参数 间隔时间大小的参数被称之为batch间隔参数 batch间隔范围一般为 500 毫秒到几分钟,由开发者定义。 spark streaming应用 spark streaming应用程序可以实时跟踪页面统计,训练机器学习模型或则自动检测异常,更多推荐参考 让你真正明白spark streaming http://www.aboutyun.com/forum.php?mod=viewthread&tid=21141 DStreams详解

DStreams是discretized streams的缩写,是离散流的意思。

DStreams是随着时间【推移】到达的一系列数据

每个dstream被表示为一个序列的RDDS(因此名称“离散”)。

DStreams可以不同的数据源创建,比如flume,kafka,或则hdfs.一旦构建,

DStreams提供两种类型的操作:

transformations,产生一个新的DStream

output operations,写数据到外部系统。

DStreams提供许多与RDD相同的操作,外加一些关于时间的操作比如slidingwindows【滑动窗口】。

DStreams来源

1.外部数据源

2.通过transformations转换而来

Transformations操作

分为有状态和无状态 Stateful transformations需要checkpointing,在StreamingContext中启用容错。 设置checkpointing ssc.checkpoint("hdfs://...") Windowed transformations window操作需要两个参数,窗口持续时间和滑动持续时间。这两个必须是多个StreamingContext的batch时间区间。DStream数据源时间间隔是10秒。想创建滑动窗口上一个30秒(或则上3batches)),我们应该设置windowDuration30秒。sliding时间间隔,默认是batch时间间隔,控制DStream刷新计算结果。如果我们的DStream batch时间区间为10秒,我们想计算我们的window,只能在每个第二batch。我们设置我们的sliding间隔为20秒。 输出操作 保存DStream 为文本文件【Scala】

[Scala]

ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")

saveAsHadoopFiles()是hadoop输出格式,例如Spark Streaming没有SaveAsSequenceFile()函数,我们可以保存为SequenceFiles Scala

val writableIpAddressRequestCount = ipAddressRequestCount.map {
(ip, count) => (new Text(ip), new LongWritable(count)) }
writableIpAddressRequestCount.saveAsHadoopFiles[
SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")

Java

JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(
new PairFunction<Tuple2<String, Long>, Text, LongWritable>() {
public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) {
return new Tuple2(new Text(e._1()), new LongWritable(e._2()));
}});
class OutFormat extends SequenceFileOutputFormat<Text, LongWritable> {};
writableDStream.saveAsHadoopFiles(
"outputDir", "txt", Text.class, LongWritable.class, OutFormat.class);

foreachRDD()

ipAddressRequestCount.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
// Open connection to storage system (e.g. a database connection)
partition.foreach { item =>
// Use connection to push item to system
}
// Close connection
}
}

checkpointing机制 spark streaming主要机制checkpointing,它将数据存储在一个可靠的文件系统,比如hdfs. checkpoint的作用,用于恢复数据。它会定期保存状态到可靠的文件系统比如hdfs,s3 比如你每5-10批数据设置checkpointing。当发生丢失数据的时候,Spark Streaming讲恢复最近的checkpoint.随着 streaming application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的5~10倍。 输入源 spark streaming支持多个数据源,一些核心的数据源,已被构建到Streaming Maven artifact,其它可以通过额外的artifact,比如spark-streaming-kafka. 核心数据源比如sockets,还有文件 和 Akka actors. 其它数据源 使用kafka必须引入artifact:spark-streaming-kafka_2.10到项目中。它提供KafkaUtils对象,通过StreamingContext 和 JavaStreamingContext创建kafka消息的DStream. 因为它订阅多个topic. DStream创建由topic 和 message组成的对。我们可以调用createStream()方法来创建Stream。字符串分割开ZooKeeper hosts, consumer group的名称(唯一的名字),receiver 线程用于topic. Apache Kafka 订阅Panda的topic【Scala】

import org.apache.spark.streaming.kafka._
...
// Create a map of topics to number of receiver threads to use
val topics = List(("pandas", 1), ("logs", 1)).toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)
StreamingLogInput.processLines(topicLines.map(_._2))


Apache Kafka 订阅 to Panda’s topic【Java】 
import org.apache.spark.streaming.kafka.*;
...
// Create a map of topics to number of receiver threads to use
Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put("pandas", 1);
topics.put("logs", 1);
JavaPairDStream<String, String> input =
KafkaUtils.createStream(jssc, zkQuorum, group, topics);
input.print();




本文分享自微信公众号 - about云(wwwaboutyuncom),作者:pig2

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-03-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Apache Spark 2.3 加入支持Native Kubernetes及新特性文档下载

    问题导读 1.什么是Kubernetes? 2.在Kubernetes集群尝试新功能,该如何实现? 3.观看群集上创建的Spark资源,该如何操作? 在开始...

    用户1410343
  • 适合小白入门Spark的全面教程

    1.实时分析 在我们开始之前,让我们来看看美国社交媒体比较有名的企业每分钟产生的数据量。

    用户1410343
  • 王联辉:Spark在腾讯应用及对企业spark使用指导

    问题导读 1.腾讯如何使用Spark 技术的?带来了哪些好处? 2.Spark 技术最适用于哪些应用场景? 3.企业在应用Spark 技术时,需要做哪些改变吗...

    用户1410343
  • 0827-7.1.4-如何在CDP中使用Spark SQL CLI

    而我们在产品开发过程中,可能需要用到spark-sql来进行数据加工,本文就采用脚本的方式,调用spark-shell来进行数据的处理,执行需要的sql语句。

    Fayson
  • 什么是sparklyr

    我们(RStudio Team)今天很高兴的宣布一个新的项目sparklyr(https://spark.rstudio.com),它是一个包,用来实现通过R连...

    Fayson
  • PySpark 中的机器学习库

    传统的机器学习算法,由于技术和单机存储的限制,比如使用scikit-learn,只能在少量数据上使用。即以前的统计/机器学习依赖于数据抽样。但实际过程中样本往往...

    zhangqibot
  • ckeditor 从入门到放弃

    通过 CKEDITOR.plugins.add方法添加插件,第一个参数为插件名,后面为参数列表。

    libo1106
  • 腾讯临境 | 十一黄金周,逛逛腾讯VR云展厅 24小时不打烊

    引言:2020腾讯全球数字生态大会云上展已经圆满召开。“截至9月18日16点,官网大会页面浏览量超380万UV,线上展区浏览超17万人次” ,腾讯的云展厅究竟...

    腾讯音视频实验室
  • linux引导系统的方法分析

    password –md5 md5后的密码字符串(可以通过grub-md5-crypt计算)

    砸漏
  • 快来看看我的 Idea总结

    这些是我平时记在有道云笔记的,现在截图过来了(直接ctrl+f搜索标题吧): 1.idea-解决端口冲突 2.导入导出ide设置settings.jar ...

    MonroeCode

扫码关注云+社区

领取腾讯云代金券