SparkStreaming入门

黄文辉同学第二篇,请大家支持!

1.SparkStreaming简介

Spark Streaming属于核心Spark API的扩展,支持实时数据流的可扩展、高吞吐、容错的流处理。可以接受来自Kafka、Flume、ZeroMQ、Kinesis、Twitter或TCP套接字的数据源,也可以使用map、reduce、join、window等高级函数表示的复杂算法进行处理。最后,处理的结果数据可以输出到hdfs,redis,数据库(如hbase)等。

2.工作原理

Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。工作原理如下图所示,Spark Streaming接受实时传入的数据流后,将数据划分成批Spark中的RDD,然后传入到Spark Engine进行处理,按批次生成最后的结果数据。

下面以wordcount简单的例子(Java语言)来理解流式计算。

SparkConf conf = new SparkConf().setAppName("SparkStream-TestJob");
     //每10秒处理一次
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
    //监听数据流地址端口 
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("192.168.191.200", 9999);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        public Iterator<String> call(String t) throws Exception {
            return Arrays.asList(t.split(" ")).iterator();
        }
    });
    JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String t) throws Exception {
            return new Tuple2<String, Integer>(t, 1);
        }
    });
   //统计单词数
    JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
            new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }
    );
    wordCounts.print();//打印处理结果到控制台
    //开始执行接收和处理数据
    jssc.start();
jssc.awaitTermination();   // 等待流计算结束,防止应用退出

在spark上执行上面代码,然后在对应的ip上打开9999端口实时进行数据传输(nc -lk 9999)

然后在spark上可以看实时看出统计结果

3. Spark Streaming核心类

3.1 StreamingContext

StreamingContext是流计算功能的主要入口。StreamingContext会在底层创建出SparkContext,用来处理数据。从上面代码中还发现,创建StreamingContext时,还需要指定多长时间来处理一次新数据的批次间隔。创建StreamingContext后,再按下列步骤执行:

1).通过输入源创建InputDStream

2).对DStreaming进行transformation和output操作

3).通过StreamContext.start()方法启动接收和处理数据

4).调用StreamingContext.awaitTermination()来等待流计算完成,来防止应用退出

5).可以调用StreamingContext.stop()方法结束程序的运行

使用时注意:

1).一个StreamStreamingContextContext只能启动一次,所以只有在配置好所有的DStream以及所需要输出操作之后才能启动。所以启动后,新的操作将不起作用

2).StreamingContext停止后,不能重新启动.。

3).在同一时间段内,一个JVM只能有一个active状态的StreamingContext

4).调用StreamingContext的stop方法时,SparkContext也将被stop掉,如果希望StreamingContext关闭时,保留SparkContext,则需要在stop方法中传入参数false去设置stopSparkContext=false

5).SparkContext对象可以被多个StreamingContexts重复使用,但需要前一个StreamingContexts停止后再创建下一个StreamingContext对象。

3.2 DStream

Spark Streaming提供一种称为DStream(Discretized Stream,离散流)的高级抽象数据流。

DStream的创建

可以从数据源(kafka、flume)的输入数据流创建,也可以在其他DStream上应用一些高级操作来创建,一个DStream可以看作是一个RDDs的序列。

DStream的核心思想

将计算作为一系列较小时间间隔的、状态无关、确定批次的任务,每个时间间隔内接收到的输入数据被可靠储存在集群中,作为它的数据集。然后进行一系列的操作。

Input DStream和Receivers

Input DStream是DStream的一种,它是从流式数据源中获取的原始数据流。上面的例子中, jssc.socketTextStream("192.168.191.200", 9999)就是接收过来的Input Stream。除了文件流外,每个Input DStream都关联一个Recevier对象,该对象接收数据源传来的数据并将其保持在内存中提供给spark使用。

Spark Streaming有两种数据类型的流式输入数据源:

1).基本输入源:能够直接应用于StreamingContext API的输入源。例如:文件系统、套接字连接,以及Akka Actor

2).高级输入源:能够应用于特定工具类的输入源。例如:Kafka、Flume、Kinnesis等,这些就需要导入一些额外的依赖包。

每个Input DStream对应一个接收器接收数据流。在Streaming应用中,可以创建多个Input DStream并行接收多个数据流。但请注意,每个接收器是一个长期运行在Worker或者Executor上的任务,因此它会占用分配给Spark Streaming应用程序的一个核(core)。

非常重要的一点是,为了保证一个或者多个接收器能够接收数据,需要分配给Spark Streaming应用程序足够多的核数。

记住要点:

1).在本地运行spark Streaming时,master URL不能使用“local”或“local[1]”。因为当Input DStream与receiver(如:sockets,Kafka,Flume等)关联时,receiver需要一个线程来运行,那么就没有多的线程去处理接收到的数据。所以,在本地运行SparkStreaming程序时,要使用“local[n]”作为master URL,其中n要大于接收器的数量。

2).在集群上运行Spark Streaming,分配给Spark Streaming程序的cpu核数也必须大于接收器的数量,否则,只会接收数据,而不会去处理数据。

4.遇到的问题

当sparkStreaming在local模式运行时,只有一个core的情况下,只会接收数据,而不能做处理,具体是会出现这样情况

提交命令:

spark-submit --class cn.test.job.TestJob --master local[1] /data/test.jar

在上文提过,在本地运行spark Streaming时,core数量必须大于接收的数量,所以此时只会接收数据而不会处理数据。所以解决方法是:将core的数量设置2以上

spark-submit --class cn.test.job.TestJob --master local[2] /data/test.jar

疑问:

1.spark-submit --class cn.test.job.TestJob --master local[2] /data/test.jar 最后使用这种方法时,运行几分钟后就又会出现先前那种问题。

2.使用集群模式运行时,也会出现这种情况

上述两种情况,我虚拟机的core已经有3个了,只有一台虚拟机

知道答案的同学帮忙留言回复。谢谢!

原文发布于微信公众号 - 大数据和云计算技术(jiezhu2007)

原文发表时间:2016-12-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

Kafka源码系列之实现自己的kafka监控

一,基本思路介绍 Kafka作为一个好用的且应用很广泛的消息队列,在大数据处理系统中基本是必不可少的。当然,作为缓存消息的消息队列,我们对其进行流量监控及消费滞...

3755
来自专栏Java3y

Java锁机制了解一下

1826
来自专栏Java3y

【Java】几道让你拿offer的面试题

之前在刷博客的时候,发现一些写得比较好的博客都会默默收藏起来。最近在查阅补漏,有的知识点比较重要的,但是在之前的博客中还没有写到,于是趁着闲整理一下。

960
来自专栏分布式系统进阶

Kafka的日志清理-LogCleanerKafka源码分析-汇总

724
来自专栏扎心了老铁

java使用spark/spark-sql处理schema数据

1、spark是什么? Spark是基于内存计算的大数据并行计算框架。 1.1 Spark基于内存计算 相比于MapReduce基于IO计算,提高了在大数据环境...

3225
来自专栏CDA数据分析师

专栏 | Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spa...

1839
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

39612
来自专栏个人分享

SparkStreaming(源码阅读十二)

  要完整去学习spark源码是一件非常不容易的事情,但是咱可以积少成多嘛~那么,Spark Streaming是怎么搞的呢?

612
来自专栏AndroidTv

Exception 和 Error 有什么区别么声明提问正文

1235
来自专栏Jed的技术阶梯

Spark-RDD持久化

使用不同参数的组合构造的实例被预先定义为一些值,比如MEMORY_ONLY代表着不存入磁盘,存入内存,不使用堆外内存,不进行序列化,副本数为1,使用persis...

513

扫描关注云+社区