大家好,又见面了,我是你们的朋友全栈君。
如同SparkContext一样,StreamingContext也是Spark Streaming应用程序通往Spark集群的通道,它的定义如下:
Java代码
通过类的文档注释,我们看到:
1. 提供了从各种输入数据源创建DStream的方法
2,参数中的batchDur_是Duration类型的对象,比如Second(10),这个参数的含义是the time interval at which streaming data will be divided into batches,也就是说,假如batchDur_为Second(10)表示Spark Streaming会把每10秒钟的数据作为一个Batch,而一个Batch就是一个RDD?是的,一个RDD的数据对应一个batchInterval累加读取到的数据
Java代码
从文档中,我们可以看到如下几点:
1. 对DStream实施map操作,会转换成另外一个DStream
2. DStream是一组连续的RDD序列,这些RDD中的元素的类型是一样的。DStream是一个时间上连续接收数据但是接受到的数据按照指定的时间(batchInterval)间隔切片,每个batchInterval都会构造一个RDD,因此,Spark Streaming实质上是根据batchInterval切分出来的RDD串,想象成糖葫芦,每个山楂就是一个batchInterval形成的RDD
3. 对DStream实施windows或者reduceByKeyAndWindow操作,也是转换成另外一个DStream(window操作是stateful DStream Transformation)
4. DStream同RDD一样,也定义了map,filter,window等操作,同时,对于元素类型为(K,V)的pair DStream,Spark Streaming提供了一个隐式转换的类,PairStreamFunctions
5. DStream内部有如下三个特性:
-DStream也有依赖关系,一个DStream可能依赖于其它的DStream(依赖关系的产生,同RDD是一样的)
-DStream创建RDD的时间间隔,这个时间间隔是不是就是构造StreamingContext传入的第三个参数?是的!
-在时间间隔到达后,DStream创建RDD的方法
在DStream内部,DStream表现为一系列的RDD的序列,针对DStream的操作(比如map,filter)会转换到它底层的RDD的操 作,由这个图中可以看出来,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了RDD@time2,。。。也就是说,在 Spark Streaming中,DStream中的每个RDD的数据是一个时间窗口的累计。
下图展示了对DStream实施转换算子flatMap操作。需要指出的是,RDD的转换操作是由Spark Engine来实现的,原因是Spark Engine接受了原始的RDD以及作用于RDD上的算子,在计算结果时才真正的对RDD实施算子操作
按照下面这幅图所呈现出来的含义是,Spark Streaming用于将输入的数据进行分解成一个一个的RDD,每个RDD交由Spark Engine进行处理以得到最后的处理数据?是的!
上图中,Spark Streaming模块用于将接受到数据定时的切分成RDD(上图中定义为batch of input data),这些RDD交由Spark Engine进行计算。Spark Streaming模块负责数据接收并定时转换成一系列RDD,Spark Engine对Spark Streaming送过来的RDD进行计算
Java代码
DStream与window相关的两个参数是windowDuration和slideDuration,这两个参数究竟表示什么含义。通过window操作,DStream转换为了WindowedDStream
windowDuration表示的是对过去的一个windowDuration时间间隔的数据进行统计计算, windowDuration是intervalBatch的整数倍,也就是说,假如windowDuration=n*intervalBatch, 那么window操作就是对过去的n个RDD进行统计计算 如下内容来自于Spark Streaming的官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html
Spark Streaming也提供了窗口计算(window computations)的功能,允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation).
slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。假想如下一种场景:
windowDuration=3*batchInterval,
slideDuration=10*batchInterval,
表示的含义是每个10个时间间隔对之前的3个RDD进行统计计算,也意味着有7个RDD没在window窗口的统计范围内。slideDuration的默认值是batchInterval
下图展示了滑动窗口的概念
如上图所示,一个滑动窗口时间段((sliding window length)内的所有RDD会进行合并以创建windowed DStream所对应的RDDD。每个窗口操作有两个参数:
These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 这表示,sliding window length的时间长度以及sliding interval都要是batch interval的整数倍。 batch interval是在构造StreamingContext时传入的(1 in the figure)
说明:
window length为什么是3?如椭圆形框,它是从第三秒开始算起(包括第三秒),第五秒结束,即包含3,4,5三个1秒,因此是3
sliding interval为什么是2?主要是看圆角矩形框的右边线,虚线的圆角矩形框的右边线在time 3结束, 实线的圆角矩形框的右边线在time 5结束,所以跨度是2。也就是看时间的最右侧即可,以右边线为基准,每个窗口操作(window length)占用了3个时间片。
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) 表示每隔10秒钟对过去30秒钟产生的单词进行计数。这个方法有个不合理的地方,既然要求sliding window length和sliding interval都是batch interval的整数倍,那么此处为什么不用时间单位,而使用绝对的时间长度呢?
这是Spark Streaming的数据输入源,包括两类:基本数据源和高级数据源
以上数据源,StreamingContext的API直接提供,
监听HDFS文件系统的新文件的创建,读取其中内容。如果文件已存在而内容有变化,是不会被监听到的,因此只能将文件内容在某个位置写好后,然后移动到Spark Streaming监听的目录,如果文件在这个目录下内容发生变化,则Spark Streaming无法监听到
另外需要注意的是,Spark Streaming启动后,Spark Streaming通过文件的最后修改时间(modify time)来判断一个新加入到监听目录的文件是否有效。如果一个较长时间没有更新的文件move到监听目录,Spark Streaming也不会对它进行读取进而计算
Java代码
Java代码
问题: converter怎么使用?把InputStream转换为Iterator[T]集合
Source Artifact
Kafka | spark-streaming-kafka_2.10 |
---|---|
Flume | spark-streaming-flume_2.10 |
Kinesis | spark-streaming-kinesis-asl_2.10 [Amazon Software License] |
spark-streaming-twitter_2.10 | |
ZeroMQ | spark-streaming-zeromq_2.10 |
MQTT | spark-streaming-mqtt_2.10 |
1. When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/148736.html原文链接:https://javaforall.cn