前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkStreaming入门

SparkStreaming入门

作者头像
大数据和云计算技术
发布2018-03-08 15:11:02
9750
发布2018-03-08 15:11:02
举报

黄文辉同学第二篇,请大家支持!

1.SparkStreaming简介

Spark Streaming属于核心Spark API的扩展,支持实时数据流的可扩展、高吞吐、容错的流处理。可以接受来自Kafka、Flume、ZeroMQ、Kinesis、Twitter或TCP套接字的数据源,也可以使用map、reduce、join、window等高级函数表示的复杂算法进行处理。最后,处理的结果数据可以输出到hdfs,redis,数据库(如hbase)等。

2.工作原理

Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。工作原理如下图所示,Spark Streaming接受实时传入的数据流后,将数据划分成批Spark中的RDD,然后传入到Spark Engine进行处理,按批次生成最后的结果数据。

下面以wordcount简单的例子(Java语言)来理解流式计算。

代码语言:java
复制
SparkConf conf = new SparkConf().setAppName("SparkStream-TestJob");
     //每10秒处理一次
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
    //监听数据流地址端口 
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("192.168.191.200", 9999);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        public Iterator<String> call(String t) throws Exception {
            return Arrays.asList(t.split(" ")).iterator();
        }
    });
    JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String t) throws Exception {
            return new Tuple2<String, Integer>(t, 1);
        }
    });
   //统计单词数
    JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
            new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }
    );
    wordCounts.print();//打印处理结果到控制台
    //开始执行接收和处理数据
    jssc.start();
jssc.awaitTermination();   // 等待流计算结束,防止应用退出

在spark上执行上面代码,然后在对应的ip上打开9999端口实时进行数据传输(nc -lk 9999)

然后在spark上可以看实时看出统计结果

3. Spark Streaming核心类

3.1 StreamingContext

StreamingContext是流计算功能的主要入口。StreamingContext会在底层创建出SparkContext,用来处理数据。从上面代码中还发现,创建StreamingContext时,还需要指定多长时间来处理一次新数据的批次间隔。创建StreamingContext后,再按下列步骤执行:

1).通过输入源创建InputDStream

2).对DStreaming进行transformation和output操作

3).通过StreamContext.start()方法启动接收和处理数据

4).调用StreamingContext.awaitTermination()来等待流计算完成,来防止应用退出

5).可以调用StreamingContext.stop()方法结束程序的运行

使用时注意:

1).一个StreamStreamingContextContext只能启动一次,所以只有在配置好所有的DStream以及所需要输出操作之后才能启动。所以启动后,新的操作将不起作用

2).StreamingContext停止后,不能重新启动.。

3).在同一时间段内,一个JVM只能有一个active状态的StreamingContext

4).调用StreamingContext的stop方法时,SparkContext也将被stop掉,如果希望StreamingContext关闭时,保留SparkContext,则需要在stop方法中传入参数false去设置stopSparkContext=false

5).SparkContext对象可以被多个StreamingContexts重复使用,但需要前一个StreamingContexts停止后再创建下一个StreamingContext对象。

3.2 DStream

Spark Streaming提供一种称为DStream(Discretized Stream,离散流)的高级抽象数据流。

DStream的创建

可以从数据源(kafka、flume)的输入数据流创建,也可以在其他DStream上应用一些高级操作来创建,一个DStream可以看作是一个RDDs的序列。

DStream的核心思想

将计算作为一系列较小时间间隔的、状态无关、确定批次的任务,每个时间间隔内接收到的输入数据被可靠储存在集群中,作为它的数据集。然后进行一系列的操作。

Input DStream和Receivers

Input DStream是DStream的一种,它是从流式数据源中获取的原始数据流。上面的例子中, jssc.socketTextStream("192.168.191.200", 9999)就是接收过来的Input Stream。除了文件流外,每个Input DStream都关联一个Recevier对象,该对象接收数据源传来的数据并将其保持在内存中提供给spark使用。

Spark Streaming有两种数据类型的流式输入数据源:

1).基本输入源:能够直接应用于StreamingContext API的输入源。例如:文件系统、套接字连接,以及Akka Actor

2).高级输入源:能够应用于特定工具类的输入源。例如:Kafka、Flume、Kinnesis等,这些就需要导入一些额外的依赖包。

每个Input DStream对应一个接收器接收数据流。在Streaming应用中,可以创建多个Input DStream并行接收多个数据流。但请注意,每个接收器是一个长期运行在Worker或者Executor上的任务,因此它会占用分配给Spark Streaming应用程序的一个核(core)。

非常重要的一点是,为了保证一个或者多个接收器能够接收数据,需要分配给Spark Streaming应用程序足够多的核数。

记住要点:

1).在本地运行spark Streaming时,master URL不能使用“local”或“local[1]”。因为当Input DStream与receiver(如:sockets,Kafka,Flume等)关联时,receiver需要一个线程来运行,那么就没有多的线程去处理接收到的数据。所以,在本地运行SparkStreaming程序时,要使用“local[n]”作为master URL,其中n要大于接收器的数量。

2).在集群上运行Spark Streaming,分配给Spark Streaming程序的cpu核数也必须大于接收器的数量,否则,只会接收数据,而不会去处理数据。

4.遇到的问题

当sparkStreaming在local模式运行时,只有一个core的情况下,只会接收数据,而不能做处理,具体是会出现这样情况

提交命令:

代码语言:javascript
复制
spark-submit --class cn.test.job.TestJob --master local[1] /data/test.jar

在上文提过,在本地运行spark Streaming时,core数量必须大于接收的数量,所以此时只会接收数据而不会处理数据。所以解决方法是:将core的数量设置2以上

代码语言:javascript
复制
spark-submit --class cn.test.job.TestJob --master local[2] /data/test.jar

疑问:

1.spark-submit --class cn.test.job.TestJob --master local[2] /data/test.jar 最后使用这种方法时,运行几分钟后就又会出现先前那种问题。

2.使用集群模式运行时,也会出现这种情况

上述两种情况,我虚拟机的core已经有3个了,只有一台虚拟机

知道答案的同学帮忙留言回复。谢谢!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-12-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据和云计算技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档