SparkStreaming入门

黄文辉同学第二篇,请大家支持!

1.SparkStreaming简介

Spark Streaming属于核心Spark API的扩展,支持实时数据流的可扩展、高吞吐、容错的流处理。可以接受来自Kafka、Flume、ZeroMQ、Kinesis、Twitter或TCP套接字的数据源,也可以使用map、reduce、join、window等高级函数表示的复杂算法进行处理。最后,处理的结果数据可以输出到hdfs,redis,数据库(如hbase)等。

2.工作原理

Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。工作原理如下图所示,Spark Streaming接受实时传入的数据流后,将数据划分成批Spark中的RDD,然后传入到Spark Engine进行处理,按批次生成最后的结果数据。

下面以wordcount简单的例子(Java语言)来理解流式计算。

SparkConf conf = new SparkConf().setAppName("SparkStream-TestJob");
     //每10秒处理一次
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
    //监听数据流地址端口 
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("192.168.191.200", 9999);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        public Iterator<String> call(String t) throws Exception {
            return Arrays.asList(t.split(" ")).iterator();
        }
    });
    JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String t) throws Exception {
            return new Tuple2<String, Integer>(t, 1);
        }
    });
   //统计单词数
    JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
            new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }
    );
    wordCounts.print();//打印处理结果到控制台
    //开始执行接收和处理数据
    jssc.start();
jssc.awaitTermination();   // 等待流计算结束,防止应用退出

在spark上执行上面代码,然后在对应的ip上打开9999端口实时进行数据传输(nc -lk 9999)

然后在spark上可以看实时看出统计结果

3. Spark Streaming核心类

3.1 StreamingContext

StreamingContext是流计算功能的主要入口。StreamingContext会在底层创建出SparkContext,用来处理数据。从上面代码中还发现,创建StreamingContext时,还需要指定多长时间来处理一次新数据的批次间隔。创建StreamingContext后,再按下列步骤执行:

1).通过输入源创建InputDStream

2).对DStreaming进行transformation和output操作

3).通过StreamContext.start()方法启动接收和处理数据

4).调用StreamingContext.awaitTermination()来等待流计算完成,来防止应用退出

5).可以调用StreamingContext.stop()方法结束程序的运行

使用时注意:

1).一个StreamStreamingContextContext只能启动一次,所以只有在配置好所有的DStream以及所需要输出操作之后才能启动。所以启动后,新的操作将不起作用

2).StreamingContext停止后,不能重新启动.。

3).在同一时间段内,一个JVM只能有一个active状态的StreamingContext

4).调用StreamingContext的stop方法时,SparkContext也将被stop掉,如果希望StreamingContext关闭时,保留SparkContext,则需要在stop方法中传入参数false去设置stopSparkContext=false

5).SparkContext对象可以被多个StreamingContexts重复使用,但需要前一个StreamingContexts停止后再创建下一个StreamingContext对象。

3.2 DStream

Spark Streaming提供一种称为DStream(Discretized Stream,离散流)的高级抽象数据流。

DStream的创建

可以从数据源(kafka、flume)的输入数据流创建,也可以在其他DStream上应用一些高级操作来创建,一个DStream可以看作是一个RDDs的序列。

DStream的核心思想

将计算作为一系列较小时间间隔的、状态无关、确定批次的任务,每个时间间隔内接收到的输入数据被可靠储存在集群中,作为它的数据集。然后进行一系列的操作。

Input DStream和Receivers

Input DStream是DStream的一种,它是从流式数据源中获取的原始数据流。上面的例子中, jssc.socketTextStream("192.168.191.200", 9999)就是接收过来的Input Stream。除了文件流外,每个Input DStream都关联一个Recevier对象,该对象接收数据源传来的数据并将其保持在内存中提供给spark使用。

Spark Streaming有两种数据类型的流式输入数据源:

1).基本输入源:能够直接应用于StreamingContext API的输入源。例如:文件系统、套接字连接,以及Akka Actor

2).高级输入源:能够应用于特定工具类的输入源。例如:Kafka、Flume、Kinnesis等,这些就需要导入一些额外的依赖包。

每个Input DStream对应一个接收器接收数据流。在Streaming应用中,可以创建多个Input DStream并行接收多个数据流。但请注意,每个接收器是一个长期运行在Worker或者Executor上的任务,因此它会占用分配给Spark Streaming应用程序的一个核(core)。

非常重要的一点是,为了保证一个或者多个接收器能够接收数据,需要分配给Spark Streaming应用程序足够多的核数。

记住要点:

1).在本地运行spark Streaming时,master URL不能使用“local”或“local[1]”。因为当Input DStream与receiver(如:sockets,Kafka,Flume等)关联时,receiver需要一个线程来运行,那么就没有多的线程去处理接收到的数据。所以,在本地运行SparkStreaming程序时,要使用“local[n]”作为master URL,其中n要大于接收器的数量。

2).在集群上运行Spark Streaming,分配给Spark Streaming程序的cpu核数也必须大于接收器的数量,否则,只会接收数据,而不会去处理数据。

4.遇到的问题

当sparkStreaming在local模式运行时,只有一个core的情况下,只会接收数据,而不能做处理,具体是会出现这样情况

提交命令:

spark-submit --class cn.test.job.TestJob --master local[1] /data/test.jar

在上文提过,在本地运行spark Streaming时,core数量必须大于接收的数量,所以此时只会接收数据而不会处理数据。所以解决方法是:将core的数量设置2以上

spark-submit --class cn.test.job.TestJob --master local[2] /data/test.jar

疑问:

1.spark-submit --class cn.test.job.TestJob --master local[2] /data/test.jar 最后使用这种方法时,运行几分钟后就又会出现先前那种问题。

2.使用集群模式运行时,也会出现这种情况

上述两种情况,我虚拟机的core已经有3个了,只有一台虚拟机

知道答案的同学帮忙留言回复。谢谢!

本文分享自微信公众号 - 大数据和云计算技术(jiezhu2007),作者:黄文辉

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2016-12-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 知识图谱扫盲

    近两年来,随着Linking Open Data等项目的全面展开,语义Web数据源的数量激增,大量RDF数据被发布。互联网正从仅包含网页和网页之间超链接的文档万...

    大数据和云计算技术
  • 速度比你想的重要

    效率高的明显好处是:单位时间内能完成更多的工作。但这只是冰山一角,假如工作速度快,你就会倾向于低估做事的成本,因此乐于完成更多的工作。 举个例子,假设你每写一...

    大数据和云计算技术
  • Spark这是要一统江湖的节奏

    Spark创始人Matei最近在spark submmit上做了一次演讲,看了内容会发现spark这是要一统江湖的架势,一起来看看都介绍了什么内容。 Spark...

    大数据和云计算技术
  • 简谈Spark Streaming的实时计算整合

    基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、ML...

    企鹅号小编
  • PostgreSQL 代价模型

    对于ORACLE ,SQL SERVER 这样的数据库的代价模型一般是不会透露给外部的,所以我们看到一些COST 也是一头雾水,摸不清头脑。

    AustinDatabases
  • Laya 中缩放的实现

    在 Laya 中, Event 是事件类型的集合。包含了常见的鼠标事件、键盘事件。

    用户2434869
  • [iOS源码笔记]·第三方网络下载处理框架:AFNetworking网络下载处理(调用栈与缓存策略篇)

    看到这里,或许可以看manager对象的AFHTTPSessionManager类型更深层定义。

    陈满iOS
  • JSON的解析

    使用时主要会涉及到json格式的互转,有对象,数组,集合,map等等。即使会了过一过眼也是好的,加深印象。

    聚沙成塔
  • 腾讯运维技术专家集结,揭秘高效智能运维 | 沙龙报名中

    活动信息 ? 收获多多 收获与腾讯、行业技术大咖面对面交流机会 收获机器学习算法在运维领域的应用经验 收获腾讯数字化转型中,海量业务上云实践经验 收获研发运...

    腾讯大讲堂
  • 腾讯运维技术专家集结,揭秘高效智能运维 | 沙龙报名中

    ? 活动信息 ? 收获多多 收获与腾讯、行业技术大咖面对面交流机会 收获机器学习算法在运维领域的应用经验 收获腾讯数字化转型中,海量业务上云实践经验 收获研...

    腾讯技术工程官方号

扫码关注云+社区

领取腾讯云代金券