首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

SparkStreaming-1

1、概述

Spark Streaming是Spark API核心的扩展,支持可扩展,高吞吐量,实时数据流的容错流处理。数据可以从像kafka,flume,,Kinesis或TCP Socket许多来源摄入,并且可以使用与像高级别功能表达复杂的算法来处理map,reduce,join和window。最后,处理的数据可以推送到文件系统,数据库和实时(dashborad)仪表板。事实上,您可以将Spark的 机器学习(ML)和 图形处理(Graphx)算法应用于数据流。

park Streaming提供了一个高层次的抽象,称为离散流或DStream,它代表连续的数据流。DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,一个DStream被表示为一系列RDD。

2、快速示例

以下示例,及后面部分示例都将使用IDEA。同时请在本地安装好Maven环境,Maven版本3.x以上。

2.1、通过spark-submit提交任务步1:创建一个Maven项目添加spark-streaming的依赖

org.apache.spark

spark-streaming_2.11

2.1.2

步2:开发以下源代码

packagecn.spark.streaming

importorg.apache.spark.SparkConf

importorg.apache.spark.storage.StorageLevel

importorg.apache.spark.streaming.dstream.DStream

importorg.apache.spark.streaming.

/**

*收集时时从网络的传递来的数据进行分析

*/

objectDemo01_NetworkWordCount {

defmain(args: Array[String]): Unit = {

valconf: SparkConf =newSparkConf().setAppName("Streaming_NetworkWordCount");

valssc: StreamingContext =newStreamingContext(conf,Seconds(2));

//声明读取的网络地址和端口

vallines: DStream[String] = ssc.socketTextStream("hadoop201",9999, StorageLevel.MEMORY_AND_DISK);

//处理数据

valwordCount: DStream[(String, Int)] = lines.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _);

wordCount.map(kv => kv._1 +"\t"+ kv._2).print(10);//输出前10行

//启动

ssc.start();

ssc.awaitTermination();

}

}

步3:打包

使用IDEA打包

步4:运行测试

在Linux上安装nc软件,使用命令:

$ sudo yum install -y nc

(关于nc的参数,请自行baidu)

现在启动nc并绑定9999端口:

$nc -lk hadoop2019999

将打好的jar包上传到linux并使用spark-submit提交任务:

-------------------------------------------

Time: 1515499200000 ms

-------------------------------------------

默认情况下,每将收集数据时,都会输出一个时间,如上面所示。

现在在nc端输入一行字符串,并用空格分开:

Jack Mary Rose

查看spark端收集并显示的数据:

-------------------------------------------

Time: 1515499196000 ms

-------------------------------------------

Mary 1

Rose 1

Jack 1

要停止Spartstreaming和nc请按下CTRL+C即可。

2.2、本地调试

Spark-Stream也可以在IDEA中进行测试,只要被监听的服务器地址在同一个网内可见即可。

本地调试,必须要设置master(...)可以是local也可以是standalone的集群或是yarn。以下代码:

packagecn.spark.streaming

importorg.apache.log4j.

importorg.apache.spark.SparkConf

importorg.apache.spark.storage.StorageLevel

importorg.apache.spark.streaming.dstream.DStream

importorg.apache.spark.streaming.

/**

*收集时时从网络的传递来的数据进行分析

*/

objectDemo01_NetworkWordCount {

defmain(args: Array[String]): Unit = {

//设置日志的级别,否则会输出很多INFO的信息

Logger.getLogger("org").setLevel(Level.WARN);

valconf: SparkConf =newSparkConf().setAppName("Streaming_NetworkWordCount");

//在编程过程中必须要设置master,如果在通过spark-submit提交则此项目可以忽略

//通过--master ..参数设置即可

conf.setMaster("local[2]");

//声明StreamingContext设置每2秒执行一次数据读取

valssc: StreamingContext =newStreamingContext(conf,Seconds(2));

//声明读取的网络地址和端口

vallines: DStream[String] = ssc.socketTextStream("hadoop201",9999, StorageLevel.MEMORY_AND_DISK);

//处理数据

valwordCount: DStream[(String, Int)] =

lines.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _);

wordCount.map(kv => kv._1 +"\t"+ kv._2).print(10);//输出前10行

//启动

ssc.start();

ssc.awaitTermination();

}

}

然后直接以本地方式运行,由于在代码中设置了日志的级别为:WARN所以,很多INFO信息将不再显示:

查看IDEA的后台,请保证在Liunx上nc已经先启动,然后启动spartstreaming以后再在nc端输出一些测试的数据,以下是显示的结果:

-------------------------------------------

Time: 1515499824000 ms

-------------------------------------------

ALex1

Hello1

Mike1

Rose1

Jack2

查看4040端口UI界面,您将会发现大量已经完成的Job

2.3、运行在spark://集群上TODO3、基本概念3.1、初始化StreamingContext

要创建Spark Streaming Application,必须要创建StreamingContext对象。可以通过SparkConf对象来创建StreamingContext对象:

valconf: SparkConf =

newSparkConf().setAppName("Streaming_NetworkWordCount");

valssc: StreamingContext =newStreamingContext(conf,Seconds(2));

也可以通过SparkContext对象来创建StreamingContext对象:

valsc:SparkContext =newSparkContext(conf);

valssc2:StreamingContext =newStreamingContext(sc,Seconds(2));

3.2、定义上下文之后,您必须执行以下操作。

1.通过创建输入DStreams来定义输入源。

2.通过将转换和输出操作应用于DStream来定义流式计算。

3.开始接收数据并使用它进行处理streamingContext.start()。

4.等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()。

5.处理可以手动停止使用streamingContext.stop()。

3.3、要记住的要点:

1.一旦StreamingContext已经开始,就不能建立或添加新的流式计算。

2.一旦StreamingContext被停止,它就不能被重新启动。

3.一个JVM中只能有一个StreamingContext同时处于活动状态。

4.停止StreamingContext的stop()也停止了SparkContext。要仅停止StreamingContext,请将可选参数的stop()调用设置stopSparkContext为false。

5.只要先前的StreamingContext在下一个StreamingContext创建之前停止(不停止SparkContext),就可以重新使用SparkContext来创建多个StreamingContext。

3.4、离散流(DStreams)

离散流或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,或者是从源接收的输入数据流,或者是通过转换输入流而生成的处理过的数据流。在内部,DStream由连续的RDD系列表示,这是Spark对不可变的分布式数据集的抽象(详见Spark Programming Guide)。DStream中的每个RDD都包含一定间隔的数据,如下图所示。

DStream上应用的任何操作都会转换为对基础RDD的操作。例如,在前面将线路流转换为字的示例中,该flatMap操作应用于linesDStream中的每个RDD,以生成DStream的wordsRDD。如下图所示。

这些基础的RDD转换由Spark引擎计算。DStream操作隐藏了大部分这些细节,并为开发人员提供了一个更高级别的API。

参考地址:

http://spark.apache.org/docs/latest/streaming-programming-guide.html

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180112G073Z900?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券