让你真正明白spark streaming

spark streaming介绍

Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。我们可以从kafka、flume、witter、 ZeroMQ、Kinesis等源获取数据,也可以通过由 高阶函数map、reduce、join、window等组成的复杂算法计算出数据。最后,处理后的数据可以推送到文件系统、数据库、实时仪表盘中

为什么使用spark streaming 很多大数据应用程序需要实时处理数据流。思考: 我们知道spark和storm都能处理实时数据,可是spark是如何处理实时数据的,spark包含比较多组件:包括

  • spark core
  • Spark SQL
  • Spark Streaming
  • GraphX
  • MLlib

spark core中包含RDD、DataFrame和DataSet等,因此spark sql是为了兼容hive而产生的sql语句,GraphX提供的分布式图计算框架,MLlib提供的机器学习框架。因此spark所谓的实时处理数据则是通过spark streaming来实现的。 那么spark有哪些应用 如网站监控

欺诈检测

实时准确数据转移

反作弊 + 计费

什么是StreamingContext 为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext 对象可以用SparkConf对象创建。StreamingContext这里可能不理解,其实跟SparkContext也差不多的。(可参考让你真正理解什么是SparkContext, SQLContext 和HiveContext)。同理也有hadoop Context,它们都是全文对象,并且会获取配置文件信息。那么配置文件有哪些?比如hadoop的core-site.xml,hdfs-site.xml等,spark如spark-defaults.conf等。这时候我们可能对StreamingContext有了一定的认识。下面一个例子 为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。 一个StreamingContext 对象可以用SparkConf对象创建。

[Scala] 纯文本查看 复制代码

?

import org.apache.spark._
impoty org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc=new StreamingContext(conf,Seconds(1))

appName表示你的应用程序显示在集群UI上的名字,master 是一个Spark、Mesos、YARN集群URL 或者一个特殊字符串“local

  • ”,它表示程序用本地模式运行。当程序运行在集群中时,你并不希望在程序中硬编码 master ,而是希望用 sparksubmit启动应用程序,并从 spark-submit 中得到 master 的值。对于本地测试或者单元测试,你可以传递“local”字符串在同 一个进程内运行Spark Streaming。需要注意的是,它在内部创建了一个SparkContext对象,你可以通过 ssc.sparkContext访问这个SparkContext对象。 批时间片需要根据你的程序的潜在需求以及集群的可用资源来设定,你可以在性能调优那一节获取详细的信息.可以利用已经存在的 SparkContext 对象创建 StreamingContext 对象。 [Scala] 纯文本查看 复制代码 ? import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1)) 当一个上下文(context)定义之后,你必须按照以下几步进行操作
  • 定义输入源;
  • 准备好流计算指令;
  • 利用 streamingContext.start() 方法接收和处理数据;
  • 处理过程将一直持续,直到 streamingContext.stop() 方法被调用。 几点需要注意的地方:
  • 一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中。
  • 一旦一个context已经停止,它就不能再重新启动
  • 在JVM中,同一时间只能有一个StreamingContext处于活跃状态
  • 在StreamingContext上调用 stop() 方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设
  • 置 stop() 的可选参数为false
  • 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面
  • StreamingContext创建之前关闭(不关闭SparkContext)。 什么是DStream Spark Streaming支持一个高层的抽象,叫做离散流( discretized stream )或者 DStream ,它代表连续的数据流。DStream既可以利用从Kafka, Flume和Kinesis等源获取的输入数据流创建,也可以 在其他DStream的基础上通过高阶函数获得。在内部,DStream是由一系列RDDs组成。 举例: 一个简单的基于Streaming的workCount代码如下: [Scala] 纯文本查看 复制代码 ? package com.debugo.example import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf object WordCountStreaming { def main(args: Array[String]): Unit ={ val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077") //create the streaming context val ssc = new StreamingContext(sparkConf, Seconds(30)) //process file when new file be found. val lines = ssc.textFileStream("file:///home/spark/data") val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } 这段代码实现了当指定的路径有新文件生成时,就会对这些文件执行wordcount,并把结果print。具体流程如下:

代码诠释: 使用Spark Streaming就需要创建StreamingContext对象(类似SparkContext)。创建StreamingContext对象所需的参数与SparkContext基本一致,包括设定Master节点(setMaster),设定应用名称(setAppName)。第二个参数Seconds(30),指定了Spark Streaming处理数据的时间间隔为30秒。需要根据具体应用需要和集群处理能力进行设置。 val lines = ssc.textFileStream("file:///home/spark/data")为创建lines Dstream val words = lines.flatMap(_.split(" "))为通过flatMap转换为words Dstream 我们在引一例,比如创建Twitter val tweets=ssc.twitterStream()

原文发布于微信公众号 - about云(wwwaboutyuncom)

原文发表时间:2017-03-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LhWorld哥陪你聊算法

【Spark篇】---Spark初始

Spark是基于内存的计算框架,性能要优于Mapreduce,可以实现hadoop生态圈中的多个组件,是一个非常优秀的大数据框架,是Apache的顶级项目。On...

1563
来自专栏Jed的技术阶梯

Spark-RDD持久化

使用不同参数的组合构造的实例被预先定义为一些值,比如MEMORY_ONLY代表着不存入磁盘,存入内存,不使用堆外内存,不进行序列化,副本数为1,使用persis...

2303
来自专栏个人分享

SparkStreaming(源码阅读十二)

  要完整去学习spark源码是一件非常不容易的事情,但是咱可以积少成多嘛~那么,Spark Streaming是怎么搞的呢?

1362
来自专栏肖力涛的专栏

Spark踩坑记:Spark Streaming+kafka应用及调优

本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka 在舆情项目中的应用,最后将自己...

5.6K3
来自专栏Java技术栈

Tomcat集群session复制与Oracle的坑。。

问题描述 公司某个系统使用了tomcat自带的集群session复制功能,然后后报了一个oracle驱动包里面的连接不能被序列化的异常。 01-Nov-2017...

3869
来自专栏行者悟空

Spark DAG调度

1543
来自专栏Albert陈凯

Spark对比Hadoop MapReduce 的优势

与Hadoop MapReduce相比,Spark的优势如下: ❑ 中间结果:基于MapReduce的计算引擎通常将中间结果输出到磁盘上,以达到存储和容错的目...

3114
来自专栏Albert陈凯

Spark系列课程-00xxSpark任务调度疑问,生成有向无环图的这个东西叫什么名字?

下面我们一起来看一下Spark的任务调度 ? Spark任务调度.png 首先最左边的叫做RDD Object就是一个一个的RDD对象 一个一个的RDD对象,...

45114
来自专栏知识分享

串口通信DMA中断

这是以前学32的时候写的,那时候学了32之后感觉32真是太强大了,比51强的没影。关于dma网上有许多的资料,亲们搜搜,这里只贴代码了,其实我也想详详细细地叙述...

3267
来自专栏个人分享

Shuffle相关分析

 Shuffle描述是一个过程,表现出的是多对多的依赖关系。Shuffle是连接map阶段和Reduce阶段的纽带,每个Reduce Task都会从Map Ta...

1074

扫码关注云+社区

领取腾讯云代金券