前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《从0到1学习Spark》—Spark Streaming

《从0到1学习Spark》—Spark Streaming

作者头像
程序员小强
发布2019-06-03 16:18:22
9200
发布2019-06-03 16:18:22
举报
文章被收录于专栏:小强的进阶之路

最近要做关于实时数据的处理,需要用到SparkStreaming,于是乎把SparkStreaming拿出来在看看。

前言

我们先来看看Spark官方文档对于Spark Streaming的定义:Spark Streaming是对Spark核心API的扩展,并且是一个具有可伸缩,高吞吐,容错特性的实时数据流处理框架。它支持多种数据源作为数据,比如Kafka,Flume,Kinesis或者TCP sockets,并且可以使用RDD高等函数,比如map, reduce, join和window,来实现复杂的数据处理算法。最终,处理后的数据可以输出到文件系统。数据库或者实时图表中。实际上,你还可以使用Spark的机器学习包和图处理包来对数据进行处理。

Spark Streaming内部是这样工作的。Spark Streaming接收实时流数据,然后把数据切分成一个一个的数据分片。最后每个数据分片都会通过Spark引擎的处理生成最终的数据文件。

提供了一个高等级的抽象,名为discretized stream或DStream,来表示一个连续的数据流。DStrem可以从一个输入流数据源创建,比如Kafka,Flume,Kinesis,或者通过对其他DStream应用一些高等操作来获得。实际上在Spark内部DStream就是一系列的RDD分片。

小强手把手教学

通过maven或者sbt来管理Spark Streaming的依赖包。

代码语言:javascript
复制
<dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-streaming_2.11artifactId>
        <version>2.1.0version>
dependency>
代码语言:javascript
复制
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"

如果你使用Kafka,Flume和Kinesis作为你的数据源,你必须引入相对应的依赖包park-streaming-xyz_2.11,因为Spark Streaming的核心API并不支持这些数据源。

数据源

依赖包

Kafka

spark-streaming-kafka-0-8_2.11

Flume

spark-streaming-flume_2.11

Kinesis

spark-streaming-kinesis-asl_2.11 [Amazon Software License]

在真正开始接触Spark Streaming程序细节之前,我们先看一看一个Spark Streaming的简单例子长成什么样子,我们需要统计一下文本中单词的词频WordCount,数据来源为TCP Socket。接下来,小强和大家一起看下。

  首先我们引入了Spark Stream执行过程中需要用到的类和一些来自于StreamingContext的隐式装换。其中StreamingContext是Spark中所有流处理功能的入口点,我们使用两个本地线程,同时定义了数据分片时间间隔为1秒。

代码语言:javascript
复制
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

其中appName是应用名称,这个名字会显示在Spark的Web UI界面上。而master则是Spark,Mesos或者Yarn集群的URL地址,当然你也可以使用"local[*]"来启动本地模式运行。不过使用集群方式运行的话,我们一般不推荐使用setMaster方法来把设置写死在代码中,而是在spark-submit的时候使用--master参数来动态指定。但是在本地调试的时候,可以直接使用这种方式。(注意:在创建StreamingContext实例的时候,会自动创建一个SparkContext,我们可以使用ssc.sparkContext来访问)

在context被初始化后,你还需要做如下几点:

  1. 通过input DStream来定义输入源
  2. 通过DStream的转换操作和输出操作来定义流数据处理规则
  3. 开始接受数据并处理:streamingContext.start()
  4. 等待流处理终止(或者出现异常退出): streamingContext.awaitTermination()
  5. 手动终止流处理:streamingContext.stop()

通过这个StreamingContext,我们可以从一个TCP数据源接收流式数据,在这里我们需要指定主机和端口。

代码语言:javascript
复制
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

这个名为lines的DStream对象从数据服务器接收数据,DStream中的每一条数据都是一行文本,接下来我们使用空格来对数据进行分割,形成一个一个的单词。

代码语言:javascript
复制
// Split each line into words
val words = lines.flatMap(_.split(" "))

我们应用的flatMap这个DStream操作会把每一行数据切分成一个一个的单词,然后把所有DStream中所有行切分成的单词形成一个新的words DStream。接下来我们要对words进行词频统计。

代码语言:javascript
复制
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

这个words DStream通过一个map(一对一)操作生成一个新的(word, 1) DStream,接下来通过reduce方法,我们可以得到每个数据分片的词频数据,然后通过wordCounts.print()方法打印出来。

  注意,这个时候Spark Stream并没有启动,我们只是定义了DStream数据源以及要对DStream做什么操作。想要启动Spark Stream,我们需要执行StreamingContext的start方法。

代码语言:javascript
复制
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

下面我们来运行一下这个例子,保证你已经成功部署Spark环境。首先我们启动netcat向端口发送数据。

代码语言:javascript
复制
$ nc -lk 9999

接下来启动NetworkWordCount实例,在Spark的根目录下运行下面命令。

代码语言:javascript
复制
$ ./bin/run-example streaming.NetworkWordCount localhost 9999

流处理程序接受9999端口发送的数据,每秒形成一个数据分片,然后进行处理,并打印。

代码语言:javascript
复制
# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world
代码语言:javascript
复制
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

完整代码可以从NetworkWordCount获取。

今天大家跟着小强手把手搭建了Spark Streaming的workcount例子,相信你对Streaming功能很感兴趣,它是如何实现分片和流式的数据处理,后续小强会娓娓道来。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-04-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 MoziInnovations 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档