Scala语言开发Spark应用程序
本来这篇文章早就应该写了,拖到现在都有点不好意思了,今天就简单写点 算抛砖吧 ,砸不砸到人 ,请各位看官自行躲避。闲话少说步入正题。
Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,没关系,大家一起学习,反正我也不会。我会在后续的文章中继续介绍scala.
本章的重点是如何利用scala实现spark,先简单说说spark吧,
上图就清晰多了。
介绍我也就不多说了 。因为不是我这一期的重点,如果大家有兴趣可以给我留言 ,需要了解这方面的详细知识,我可以在后续的文章中详细介绍。
我为什么要用scala,而不用java实现呢,你只需要记住两点 ,1.FP泛型支持,2类型系统支持。
本篇我简单介绍scala spark 编程WordCount, Flume与spark 的结合;
1. WordCount
WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数。
一般我们写Spark程序时,需要包含以下两个头文件:
importorg.apache.spark.SparkConf
importorg.apache.spark.SparkContext
步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:
val sc = new SparkContext(args(0),"WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
因为我这是在本地写的可能没有涉及这些参数。
val sc = new SparkContext(conf)
步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是
例如源码HdfsWordCount.scala
Hadoop中的TextInputFormat解析输入数据,举例如下
val lines = ssc.textFileStream(args(0))
当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:
valinputFormatClass=classOf[SequenceFileInputFormat[Text,Text]]
varhadoopRdd= sc.hadoopRDD(conf,inputFormatClass,classOf[Text],classOf[Text]
步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:
valline= hadoopRdd.flatMap{
case(key,value) => value.toString().split("\\s+");
}.map(word => (word,1)). reduceByKey (_ + _)
其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,
步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:
例子:
result.saveAsSequenceFile(args(2))
需要注意的是,指定输入输出文件时,需要指定hdfs的URI,其中,“hdfs://hadoop”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体按照你的配置指定就ok。
2:spark与flume结合实例
Spark Streaming是一个新的实时计算的利器,而且还在快速的发展。它将输入流切分成一个个的DStream转换为RDD,从而可以使用Spark来处理。它直接支持多种数据源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函数:map
, reduce
, join
, window等。
用flume做数据收集,spark做数据分析,
源代码
简单写个实例
代码分析这块,我先简单的写道这块 ,肯定有不足的地方 ,下回我会多注意 。