Scala语言开发Spark应用程序

Scala语言开发Spark应用程序

本来这篇文章早就应该写了,拖到现在都有点不好意思了,今天就简单写点 算抛砖吧 ,砸不砸到人 ,请各位看官自行躲避。闲话少说步入正题。

Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,没关系,大家一起学习,反正我也不会。我会在后续的文章中继续介绍scala.

本章的重点是如何利用scala实现spark,先简单说说spark吧,

上图就清晰多了。

介绍我也就不多说了 。因为不是我这一期的重点,如果大家有兴趣可以给我留言 ,需要了解这方面的详细知识,我可以在后续的文章中详细介绍。

我为什么要用scala,而不用java实现呢,你只需要记住两点 ,1.FP泛型支持,2类型系统支持。

本篇我简单介绍scala spark 编程WordCount, Flume与spark 的结合;

1. WordCount

WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数。

一般我们写Spark程序时,需要包含以下两个头文件:

importorg.apache.spark.SparkConf

importorg.apache.spark.SparkContext

步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:

val sc = new SparkContext(args(0),"WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

因为我这是在本地写的可能没有涉及这些参数。

val sc = new SparkContext(conf)

步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是

例如源码HdfsWordCount.scala

Hadoop中的TextInputFormat解析输入数据,举例如下

val lines = ssc.textFileStream(args(0))

当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:

valinputFormatClass=classOf[SequenceFileInputFormat[Text,Text]]

varhadoopRdd= sc.hadoopRDD(conf,inputFormatClass,classOf[Text],classOf[Text]

步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:

valline= hadoopRdd.flatMap{

case(key,value) => value.toString().split("\\s+");

}.map(word => (word,1)). reduceByKey (_ + _)

其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,

步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:

例子:

result.saveAsSequenceFile(args(2))

需要注意的是,指定输入输出文件时,需要指定hdfs的URI,其中,“hdfs://hadoop”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体按照你的配置指定就ok。

2:spark与flume结合实例

Spark Streaming是一个新的实时计算的利器,而且还在快速的发展。它将输入流切分成一个个的DStream转换为RDD,从而可以使用Spark来处理。它直接支持多种数据源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函数:map, reduce, join, window等。

用flume做数据收集,spark做数据分析,

源代码

简单写个实例

代码分析这块,我先简单的写道这块 ,肯定有不足的地方 ,下回我会多注意 。

本文分享自微信公众号 - 我是攻城师(woshigcs)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2015-01-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏伦少的博客

SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

19660
来自专栏岑玉海

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程指南》。 Example代码分析 val ...

35870
来自专栏数据科学与人工智能

【Spark研究】Spark编程指南(Python版)

Spark编程指南 译者说在前面:最近在学习Spark相关的知识,在网上没有找到比较详细的中文教程,只找到了官网的教程。出于自己学习同时也造福其他初学者的目的,...

1.8K50
来自专栏Small Code

【Python】自动生成命令行工具 - fire 简介

Python 中用于生成命令行接口(Command Line Interfaces, CLIs)的工具已经有一些了,例如已经成为 Python 标准库的 arg...

53790
来自专栏编程

如何正确并快速理解MapReduce

什么是MapReduce?Map本意可以理解为地图,映射(面向对象语言都有Map集合),这里我们可以理解为从现实世界获得或产生映射。Reduce本意是减少的意思...

21860
来自专栏王小雷

Spark学习之Spark调优与调试(7)

Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项。 当创建一个SparkContext时就...

27570
来自专栏我是攻城师

Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据

54880
来自专栏张浩的专栏

Hive 创建自定义函数(UDF)

当Hive中的内置函数不满足我们需求的时候,我们可以自定义我们自己的Hive函数,来满足我们的需求。

62310
来自专栏祝威廉

Structured Streaming如何实现Parquet存储目录按时间分区

StreamingPro现在支持以SQL脚本的形式写Structured Streaming流式程序了: mlsql-stream。不过期间遇到个问题,我希望按...

17810
来自专栏Jed的技术阶梯

SparkStreaming 写数据到 HBase,由于共用连接造成的数据丢失问题

有如下程序,SparkStreaming 读取 Kafka 中的数据,经过处理后,把数据写入到 Hbase 中

1.2K20

扫码关注云+社区

领取腾讯云代金券