前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Scala语言开发Spark应用程序

Scala语言开发Spark应用程序

作者头像
我是攻城师
发布2018-05-11 16:47:32
1.3K0
发布2018-05-11 16:47:32
举报
文章被收录于专栏:我是攻城师我是攻城师

Scala语言开发Spark应用程序

本来这篇文章早就应该写了,拖到现在都有点不好意思了,今天就简单写点 算抛砖吧 ,砸不砸到人 ,请各位看官自行躲避。闲话少说步入正题。

Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,没关系,大家一起学习,反正我也不会。我会在后续的文章中继续介绍scala.

本章的重点是如何利用scala实现spark,先简单说说spark吧,

上图就清晰多了。

介绍我也就不多说了 。因为不是我这一期的重点,如果大家有兴趣可以给我留言 ,需要了解这方面的详细知识,我可以在后续的文章中详细介绍。

我为什么要用scala,而不用java实现呢,你只需要记住两点 ,1.FP泛型支持,2类型系统支持。

本篇我简单介绍scala spark 编程WordCount, Flume与spark 的结合;

1. WordCount

WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数。

一般我们写Spark程序时,需要包含以下两个头文件:

importorg.apache.spark.SparkConf

importorg.apache.spark.SparkContext

步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:

代码语言:javascript
复制
val sc = new SparkContext(args(0),"WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

因为我这是在本地写的可能没有涉及这些参数。

代码语言:javascript
复制
val sc = new SparkContext(conf)

步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是

例如源码HdfsWordCount.scala

Hadoop中的TextInputFormat解析输入数据,举例如下

代码语言:javascript
复制
val lines = ssc.textFileStream(args(0))

当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:

valinputFormatClass=classOf[SequenceFileInputFormat[Text,Text]]

varhadoopRdd= sc.hadoopRDD(conf,inputFormatClass,classOf[Text],classOf[Text]

步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:

valline= hadoopRdd.flatMap{

case(key,value) => value.toString().split("\\s+");

}.map(word => (word,1)). reduceByKey (_ + _)

其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,

步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:

例子:

result.saveAsSequenceFile(args(2))

需要注意的是,指定输入输出文件时,需要指定hdfs的URI,其中,“hdfs://hadoop”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体按照你的配置指定就ok。

2:spark与flume结合实例

Spark Streaming是一个新的实时计算的利器,而且还在快速的发展。它将输入流切分成一个个的DStream转换为RDD,从而可以使用Spark来处理。它直接支持多种数据源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函数:map, reduce, join, window等。

用flume做数据收集,spark做数据分析,

源代码

简单写个实例

代码分析这块,我先简单的写道这块 ,肯定有不足的地方 ,下回我会多注意 。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2015-01-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我是攻城师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档