Spark Streaming入门

本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。

什么是Spark Streaming?

首先,什么是流(streaming)?数据流是连续到达的无穷序列。流处理将不断流动的输入数据分成独立的单元进行处理。流处理是对流数据的低延迟处理和分析。Spark Streaming是Spark API核心的扩展,可实现实时数据的快速扩展,高吞吐量,高容错处理。Spark Streaming适用于大量数据的快速处理。实时处理用例包括:

  • 网站监控,网络监控
  • 欺诈识别
  • 网页点击
  • 广告
  • 物联网传感器

Spark Streaming支持如HDFS目录,TCP套接字,Kafka,Flume,Twitter等数据源。数据流可以用Spark 的核心API,DataFrames SQL,或机器学习的API进行处理,并且可以被保存到HDFS,databases或Hadoop OutputFormat提供的任何文件系统中去。

Spark Streaming输入输出

Spark Straming如何工作

Spark Streaming将数据流每X秒分作一个集合,称为Dstreams,它在内部是一系列RDD。您的Spark应用程序使用Spark API处理RDD,并且批量返回RDD操作的结果。

示例应用程序的体系结构

Spark Streaming示例代码执行以下操作:

  • 读取流式数据。
  • 处理流数据。
  • 将处理后的数据写入HBase表。

其他Spark示例代码执行以下操作:

  • 读取流媒体代码编写的HBase Table数据
  • 计算每日汇总的统计信息
  • 将汇总统计信息写入HBase表

示例数据集

油泵传感器数据文件放入目录中(文件是以逗号为分隔符的CSV)。Spark Streaming将监视目录并处理在该目录中创建的所有文件。(如前所述,Spark Streaming支持不同的流式数据源;为简单起见,此示例将使用CSV。)

以下是带有一些示例数据的csv文件示例:

我们使用Scala案例类来定义与传感器数据csv文件相对应的传感器模式,并使用parseSensor函数将逗号分隔值解析到传感器案例类中。

HBase表格模式

流数据的HBase表格模式如下:

  • 泵名称日期和时间戳的复合行键
  • 可以设置报警列簇,来监控数据。请注意,数据和警报列簇可能会设为在一段时间后失效。

日常统计汇总的模式如下所示:

  • 泵名称和日期的复合行键
  • 列簇统计
  • 最小值,最大值和平均值。

下面的函数将Sensor对象转换为HBase Put对象,该对象用于将数据行插入到HBase中。

写HBase表的配置

您可以使用Spark 的TableOutputFormat类写入HBase表,这与您从MapReduce写入HBase表的方式类似。下面我们使用TableOutputFormat类设置HBase的配置。

Spark Streaming示例代码

这些是Spark Streaming代码的基本步骤:

  1. 初始化Spark StreamingContext对象。
  2. 将转换和输出操作应用于DStream。
  3. 开始接收数据并使用streamingContext.start()处理它。
  4. 等待streamingContext.awaitTermination()的返回从而停止处理。

我们将通过示例应用程序代码完成这些步骤。

初始化StreamingContext

首先,我们创建一个StreamingContext,这是流式传输的主要入口点(2秒间隔时间

)。

val sparkConf =  new  SparkConf ( ) . setAppName ( "HBaseStream" )
// 创建 StreamingContext, 流式函数的主要入口
val ssc =  new  StreamingContext ( sparkConf ,  Seconds ( 2 ) )

接下来,我们使用StreamingContext textFileStream(directory)方法创建一个输入流,该输入流监视Hadoop兼容的文件系统以获取新文件,并处理在该目录中创建的所有文件。

// 创建代表数据 DStream对象
val linesDStream = ssc . textFileStream ( "/user/user01/stream" )

linesDStream代表数据流,每个记录都是一行文本。内部DStream是一系列RDD,每个批处理间隔一个RDD。

将转换和输出操作应用于DStream

接下来,我们将数据行解析为Sensor对象,并使用DStream行上的map操作。

// 把lineDSream的每一行解析为Sensor对象
val sensorDStream = linesDStream . map ( Sensor . parseSensor )

map操作在linesDStream中的RDD上使用Sensor.parseSensor函数,从而生成Sensor对象(RDD)。

接下来,我们使用DStream foreachRDD方法将处理应用于此DStream中的每个RDD。我们过滤低psi传感器对象以创建警报,然后我们通过将传感器和警报数据转换为Put对象并使用PairRDDFunctions saveAsHadoopDataset(https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/rdd/PairRDDFunctions.html#saveAsHadoopDataset(org.apache.hadoop.mapred.JobConf%29)方法将传感器和警报数据写入HBase ,该方法使用Hadoop将RDD输出到任何支持Hadoop的存储系统,该存储系统的配置对象(请参阅上面的HBase的Hadoop配置)。

// 对每一个RDD. 
sensorRDD . foreachRDD { rdd =>
       // 低psi的传感器过滤器 
     val alertRDD = rdd . filter ( sensor => sensor . psi <  5.0 )
      // 把传感器数据转为对象并写入HD
      rdd . map ( Sensor . convertToPut ) . saveAsHadoopDataset ( jobConfig )
     // 把警报转为对象并写入HD
     rdd . map ( Sensor . convertToPutAlert ) . saveAsHadoopDataset ( jobConfig )
}

sensorRDD对象被转换并写入HBase。

开始接收数据

要开始接收数据,我们必须在StreamingContext上显式调用start(),然后调用awaitTermination来等待计算完成。

     // 开始计算
    ssc . start ( )
    // 等待计算完成
    ssc . awaitTermination ( )

Spark R写入HBase

现在我们要读取HBase传感器表数据,计算每日摘要统计信息并将这些统计信息写入。

以下代码读取HBase表,传感器表,psi列数据,使用StatCounter计算此数据的统计数据,然后将统计数据写入传感器统计数据列。

 // HBase的读取设置 
    val conf = HBaseConfiguration . create ( )
    conf . set ( TableInputFormat . INPUT_TABLE , HBaseSensorStream . tableName )
    // 扫描数据
    conf . set ( TableInputFormat . SCAN_COLUMNS ,  "data:psi" ) 
// 加载RDD (row key, row Result)元组
    val hBaseRDD = sc . newAPIHadoopRDD ( conf , classOf [ TableInputFormat ] ,
      classOf [ org . apache . hadoop . hbase . io . ImmutableBytesWritable ] ,
      classOf [ org . apache . hadoop . hbase . client . Result ] )
    // 把(row key, row Result) 元组为RDD
    val resultRDD = hBaseRDD.map(tuple => tuple._2)
    // 转为 RDD (RowKey, ColumnValue), 移除Time
    val keyValueRDD = resultRDD.
              map(result => (Bytes.toString(result.getRow()).
              split(" ")(0), Bytes.toDouble(result.value)))
    // 分组,得到统计数据
    val keyStatsRDD = keyValueRDD.
             groupByKey().
             mapValues(list => StatCounter(list))
    // 转码rowkey,统计信息放入并写入hbase
    keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

下图显示newAPIHadoopRDD的输出。PairRDDFunctions saveAsHadoopDataset将Put对象保存到HBase。

软件

运行程序

您可以将代码作为独立应用程序运行,如“MapR Sandbox上的Spark入门教程”中所述。

以下是总的步骤:

  1. 按照MapR沙箱入门Spark中的介绍,用户ID user01,密码mapr。
  2. 使用maven构建应用程序。
  3. 使用scp将jar文件和数据文件复制到沙盒主目录/ user / user01。
  4. 运行应用程序:/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -pathhbase classpath --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar
  5. 将流式数据文件复制到流目录中:cp sensordata.csv /user/user01/stream/
  6. 读取数据并计算一列的数据/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -path hbase classpath - --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
  7. 计算整行的统计信息/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -path hbase classpath - --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar

总结

这就结束了关于使用HBase进行Spark Streaming的教程。您可以在参考资料部分找到更多信息。

参考文献和更多信息:

本文的版权归 轻吻晴雯 所有,如需转载请联系作者。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

1.1.3 Spark架构与单机分布式系统架构对比

传统的单机系统,虽然可以多核共享内存、磁盘等资源,但是当计算与存储能力无法满足大规模数据处理的需要时,面对自身CPU与存储无法扩展的先天限制,单机系统就力不从心...

3945
来自专栏加米谷大数据

Spark核心技术原理透视二(Spark运行模式)

上一章节详细讲了Spark的运行原理,没有关注的童鞋可以关注加米谷大数据查看上一章节的详细内容。通过Spark运行原理的讲解大家了解了Spark在底层的运行,那...

6627
来自专栏java达人

自学Apache Spark博客(节选)

作者:Kumar Chinnakali 译者:java达人 来源:http://dataottam.com/2016/01/10/self-learn-yo...

2479
来自专栏恰童鞋骚年

Hadoop学习笔记—21.Hadoop2的改进内容简介

Hadoop2相比较于Hadoop1.x来说,HDFS的架构与MapReduce的都有较大的变化,且速度上和可用性上都有了很大的提高,Hadoop2中有两个重要...

952
来自专栏PPV课数据科学社区

干货 | 98道常见Hadoop面试题及答案解析(一)

这是一篇hadoop的测试题及答案解析,题目种类挺多,一共有98道题,题目难度不大,对于高手来说,90分以上才是你的追求。 1 单选题 1.1 下面哪个程序负责...

3534
来自专栏星汉技术

原 Spark Shuffle

5695
来自专栏CSDN技术头条

大数据技术Hadoop面试题,看看你能答对多少?答案在后面

单项选择题 1. 下面哪个程序负责 HDFS 数据存储。 a)NameNode b)Jobtracker c)Datanode d)secondaryNameN...

30010
来自专栏大数据智能实战

spark 2.0.1(技术预览版)的编译与测试(附一些新特性的介绍)

spark 2.0的预览版在前几个月已经吵得沸沸扬扬,趁着今天一起编译了下这个版本,还是非常方便的。 这回采用MVN来进行编译,具体见官网的编译帮助。 Bui...

1976
来自专栏Spark学习技巧

Spark设计理念和基本架构

2156
来自专栏数据库

爱炫耀的数据库老头儿

作者:刘欣 1 数据库老头儿 我们这个世界很大, 生活着很多人,形形色色,各怀绝技。但是被公认为最拽的一个却是数据库老头儿,年龄挺大,每天都要炫耀几遍他那关系型...

1980

扫码关注云+社区

领取腾讯云代金券