Spark机器学习模块开发环境部署与实例

-欢迎

加入AI技术专家社群>>

分布式计算一直是大数据的一个核心部分,为了应对TB甚至PB级别传统关系型数据库无法处理的数据计算量,必须有一套专用的技术框架去实现TB,PB级别的数据运算。比如目前用的最多的hadoop的mapreduce框架。但是随着大数据行业的不断发展,hadoop的mapreduce已经不再是分布式计算的第一选择,从目前的趋势来看,spark(内存分布式计算框架)极有可能取代hadoop的mapreduce计算框架,成为分布式计算的新宠。

首先来聊聊什么是spark

什么是Spark?

Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:

Spark与Hadoop的对比

Spark的中间数据放到内存中,对于迭代运算效率更高。

Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。

Spark比Hadoop更通用

Spark 提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join,cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。

这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。

不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

容错性

在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。

可用性

Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。

Spark与Hadoop的结合

Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。

Spark的适用场景

Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素)

由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。总的来说Spark的适用面比较广泛且比较通用。

运行模式

本地模式

Standalone模式

Mesoes模式

yarn模式

Spark生态系统

Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlanexecution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark 通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。

Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部 分数据。SparkStreaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它 处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。

Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。

在eclipse下集成Scala

接下来将介绍如何用eclipse开发spark项目,首先下载mars版本的eclipse。打开eclipse之后,在Help->Eclipse Marketplace中,下载scala IDE 4.2.X

然后在windows中下载sbt并且安装,sbt是scala的项目管理工具,所以必须在windows中安装SBT

安装好以后,在cmd命令行中键入sbt

这样,sbt就被初始化,并且下载和安装很多需要的插件。

初始化完毕后则会出现.sbt文件夹

点击进去之后需要在C:\Users\****\.sbt\0.13\下新建一个plugins文件夹,再在plugins目录下面添加一个plugins.sbt文件

然后编辑此文件,添加两行代码

addSbtPlugin("com.eed3si9n"% "sbt-assembly" % "0.13.0")

这两行代码第一个是添加eclipse项目编译插件,第二行代码则是添加一个assembly的打包工具插件。这两个插件在之后的内容中会被用到,是非常重要的两个插件。

然后再在命令行中输入一次sbt,这样就能成功下载这两个插件需要的组件。

然后我们返回eclipse开发环境,首先我们新建一个scala项目,然后我们再这个scala项目的根目录下新建一个build.sbt文件,然后编辑此文件:

添加以下内容:

name :="Kmeans"

version :="1.0"

scalaVersion :="2.10.4"

autoScalaLibrary:= false

resolvers ++=Seq(

"osc" at"http://maven.oschina.net/content/groups/public/",

"typesafe" at"http://repo.typesafe.com/typesafe/ivy-releases/",

"Sonatype OSS Snapshots" at"https://oss.sonatype.org/content/repositories/snapshots",

"Sonatype Releases" at"https://oss.sonatype.org/content/repositories/releases/",

"JBoss Repository" at"http://repository.jboss.org/nexus/content/repositories/releases/",

"Cloudera Repository" at"https://repository.cloudera.com/artifactory/cloudera-repos/",

"Akka Repository" at"http://repo.akka.io/releases/"

)

然后我们打开命令行,用户命令把当前目的指向到scala项目的所在目录然后键入SBT命令并按下回车,界面跳入SBT命令行。

在SBT命令行中键入eclipse

这时系统会下载很多在本机中没有的该项目的依赖包(如果以前下载过就不会再下载),下载完成后。这时,你的eclipse项目就被整合成一个可以用sbt管理的eclipse项目,并且这个项目已经包含了所有的你所需要的jar包,你无需像开发JAVA项目那样,自己去导入JAR包。

回到Eclipse IDE中写你的代码,本文中展示的实例是一个简单的kmeans算法程序,

写完你的代码后,回到cmd命令行窗口的sbt命令行中,运行assmebly命令,此命令是用于编译和最终打包的命令,会根据你build.sbt中的name和version属性命名你所生成的jar包,并存放在当面项目的根目录下的target/scala-2.10/目录下面。

这样一个scala的项目就完成了,然后拖入运行环境中,输入spark-submit --class com.Kmean.KmeansRunner --queue zyx2--master yarn-cluser --num-executors 5 --driver-memory 4G --executor-cores 15--executor-memory 6G --driver-class-path/usr/share/java/mysql-connector-java.jar /app/Kmeans-assembly-1.0.jar/fakeData/kmeans.txt命令去运行该项目就大功告成了。

代码解释:

Kmeans是一个相对简单的聚类算法,首先来介绍一下这个算法,这个算法可以把n个点按照需求聚成K个类,当然k

。这里需要提前指出的就是kmeans,由3大部分组成:

1.元数据

2.K---就是你要把元数据分成几类

3.迭代次数---就是你要整个公式迭代几次,然后慢慢收敛,最终获取上图公式中的J,然后进行分类。

在spark中提供了一个mllib包,这个包大大提高了开发算法的效率,回过头来看上面这段代码。

这里parsedData是数据源,numClusters就是需要分类的数量也就是上面说的K,而numIterations则是定义需要循环的次数。

最重要的是clusters,这个变量可以理解成为一个根据输入参数而最终产生的,可用的模型。这里mllib提供了KMeans对象的train方法去训练模型,只要传入parsedData、numClusters和numIterations这3个参数就行了,这样大大减少了代码量,提升的开发效率。

上图中我们可以看到,mllib还提供了predict方法和Vectors对象的dense方法来对每组数据按照clusters这个已生成的模型进行分类结果返回。

上图中的wssse变量对应的就是上面公式中的J(平方集)在这个实例中的结果数值。

好,现在我们把这段程序放到spark集群中运行一下看看会产生怎么样的结果呢:

我们来看看运行结果:第二张图中wssse结果为9.3333333371。这是个参考值,数值越小越好,目前看上去是整个模型到了9.3333333371这个值为最优。

结合numClusters的数值为3,那么就是要把这5组向量数据分为三类,我们看到第一组数据为1,第2,4,5组数据被分类为,第3组数据分类为2。

好,这里我们又用了两组元数据外的数据来验证一下这个模型。分别为图一中的

1.1 2.2 3.9

5.5 7.5 8.8

这两组数据得出的结论为:

新增第一组向量数据离训练数据源的第一组数据最接近,也被分到了同一组

新增第二组向量数据离训练数据源的第二组数据最接近,所以被被分到了一组。

测试结果证明这个KMeans模型训练及运行成功。

PS:聚类有N多种,KMeans只是其中一种,也有自身的缺陷,比如对线性的点无法聚类和可能会陷入局部最优。具体项目中用哪种聚类方法好,这个要看实际情况。

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20171228A00IN600?refer=cp_1026

同媒体快讯

相关快讯

扫码关注云+社区