在Apache Spark上跑Logistic Regression算法

本文旨在介绍使用机器学习算法,来介绍Apache Spark数据处理引擎。我们一开始会先简单介绍一下Spark,然后我们将开始实践一个机器学习的例子。我们将使用Qualitative Bankruptcy数据集,来自UCI机器学习数据仓库。虽然Spark支持同时Java,Scala,Python和R,在本教程中我们将使用Scala作为编程语言。不用担心你没有使用Scala的经验。练习中的每个代码段,我们都会详细解释一遍。

APACHE SPARK

Apache Spark是一个开源的集群计算框架,用Spark编写的应用程序可以比Hadoop MapReduce范式的速度高100倍以上。Spark的一个主要的特点,基于内存,运行速度快,不仅如此,复杂应用在Spark系统上运行,也比基于磁盘的MapReduce更有效。Spark还旨在更通用,因此它提供了以下库:

  • Spark SQL,处理结构化数据的模块
  • MLlib,可扩展的机器学习库
  • GraphX,图和图的并行计算API
  • Spark Streaming,可扩展的,可容错的流式计算程序

正如已经提到的,Spark支持Java,Scala,Python和R编程语言。它还集成了其他大数据工具。特别是,Spark可以运行在Hadoop集群,可以访问任何数据源,包括Hadoop Cassandra。

Spark核心概念

在一个高的抽象层面,一个Spark的应用程序由一个驱动程序作为入口,在一个集群上运行各种并行操作。驱动程序包含了你的应用程序的main函数,然后将这些应用程序分配给集群成员执行。驱动程序通过SparkContext对象来访问计算集群。对于交互式的shell应用,SparkContext默认可通过sc变量访问。

Spark的一个非常重要的概念是RDD–弹性分布式数据集。这是一个不可改变的对象集合。每个RDD会分成多个分区,每个分区可能在不同的群集节点上参与计算。RDD可以包含任何类型的Java,Scala对象,Python或R,包括用户自定义的类。RDDS的产生有两种基本方式:通过加载外部数据集或分配对象的集合如,list或set。

在创建了RDDs之后,我们可以对RDDs做2种不同类型的操作:

  • Transformations - 转换操作,从一个RDD转换成另外一个RDD
  • Actions - 动作操作,通过RDD计算结果

RDDs通过lazy的方式计算 - 即当RDDs碰到Action操作时,才会开始计算。Spark的Transformations操作,都会积累成一条链,只有当需要数据的时候,才会执行这些Transformations操作。每一次RDD进行Action操作时,RDD都会重新生成。如果你希望某些中间的计算结果能被其他的Action操作复用,那么你需要调用Spark的RDD.persist()来保存中间数据。

Spark支持多种运行模式,你可以使用交互式的Shell,或者单独运行一个standalone的Spark程序。不管哪一种方式,你都会有如下的工作流:

  • 输入数据,用于生成RDD
  • 使用Transformations操作转换数据集
  • 让Spark保存一些中间计算结果,用于复用计算
  • 使用Action操作,让Spark并行计算。Spark内部会自动优化和运行计算任务。

安装Apache Spark

为了开始使用Spark,需要先从官网下载。选择“Pre-built for Hadoop 2.4 and later”版本然后点击“Direct Download”。如果是Windows用户,建议将Spark放进名字没有空格的文件夹中。比如说,将文件解压到:C:\spark。

正如上面所说的,我们将会使用Scala编程语言。进入Spark的安装路径,运行如下命令:

// Linux and Mac users

bin/spark-shell

// Windows users

bin\spark shell

然后你可以在控制台中看到Scala:

scala>

QUALITATIVE破产分类

现实生活中的问题是可以用机器学习算法来预测的。我们将试图解决的,通过一个公司的定性信息,预测该公司是否会破产。数据集可以从UCI机器学习库https://archive.ics.uci.edu/ml/datasets/qualitative_bankruptcy下载。在Spark的安装文件夹中,创建一个新的文件夹命名为playground。复制qualitative_bankruptcy.data.txt文件到这里面。这将是我们的训练数据。

数据集包含250个实例,其中143个实例为非破产,107个破产实例。

每一个实例数据格式如下:

  • 工业风险
  • 管理风险
  • 财务灵活性
  • 信誉
  • 竞争力
  • 经营风险

这些被称为定性参数,因为它们不能被表示为一个数字。每一个参数可以取下以下值:

  • P positive
  • A average
  • N negative

数据集的最后一个列是每个实例的分类:B为破产或NB非破产。

鉴于此数据集,我们必须训练一个模型,它可以用来分类新的数据实例,这是一个典型的分类问题。

解决问题的步骤如下:

  • 从qualitative_bankruptcy.data.txt文件中读取数据
  • 解析每一个qualitative值,并将其转换为double型数值。这是我们的分类算法所需要的
  • 将数据集划分为训练和测试数据集
  • 使用训练数据训练模型
  • 计算测试数据的训练误差

SPARK LOGISTIC REGRESSION

我们将用Spark的逻辑回归算法训练分类模型。如果你想知道更多逻辑回归算法的原理,你可以阅读以下教程http://technobium.com/logistic-regression-using-apache-mahout。

在Spark的Scala Shell中粘贴以下import语句:

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.linalg.{Vector, Vectors}

这将导入所需的库。

接下来我们将创建一个Scala函数,将数据集中的qualitative数据转换为Double型数值。键入或粘贴以下代码并回车,在Spark Scala Shell。

def getDoubleValue( input:String ) : Double = {

    var result:Double = 0.0

    if (input == "P")  result = 3.0 

    if (input == "A")  result = 2.0

    if (input == "N")  result = 1.0

    if (input == "NB") result = 1.0

    if (input == "B")  result = 0.0

    return result

   }

如果所有的运行都没有问题,你应该看到这样的输出:

getDoubleValue: (input: String)Double

现在,我们可以读取到qualitative_bankruptcy.data.txt文件中的数据。从Spark的角度来看,这是一个Transformation操作。在这个阶段,数据实际上不被读入内存。如前所述,这是一个lazy的方式执行。实际的读取操作是由count()引发,这是一个Action操作。

val data = sc.textFile("playground/Qualitative_Bankruptcy.data.txt")

data.count()

用我们val关键字声明一个常量data。它是一个包含输入数据所有行的RDD。读操作被SC或sparkcontext上下文变量监听。count操作应返回以下结果:

res0: Long = 250

现在是时候为逻辑回归算法准备数据,将字符串转换为数值型。

val parsedData = data.map{line =

    val parts = line.split(",")

    LabeledPoint(getDoubleValue(parts(6)), Vectors.dense(parts.slice(0,6).map(x =getDoubleValue(x))))

}

在这里,我们声明了另外一个常量,命名为parsedData。对于data变量中的每一行数据,我们将做以下操作:

  • 使用“,”拆分字符串,并获得一个向量,命名为parts
  • 创建并返回一个LabeledPoint对象。每个LabeledPoint包含标签和值的向量。在我们的训练数据,标签或类别(破产或非破产)放在最后一列,数组下标0到6。这是我们使用的parts(6)。在保存标签之前,我们将用getDoubleValue()函数将字符串转换为Double型。其余的值也被转换为Double型数值,并保存在一个名为稠密矢量的数据结构。这也是Spark的逻辑回归算法所需要的数据结构。

Spark支持map()转换操作,Action动作执行时,第一个执行的就是map()。

我们来看看我们准备好的数据,使用take():

parsedData.take(10)

上面的代码,告诉Spark从parsedData数组中取出10个样本,并打印到控制台。一样的,take()操作之前,会先执行map()。输出结果如下:

res5: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((1.0,[3.0,3.0,2.0,2.0,2.0,3.0]), (1.0,[1.0,1.0,2.0,2.0,2.0,1.0]), (1.0,[2.0,2.0,2.0,2.0,2.0,2.0]), (1.0,[3.0,3.0,3.0,3.0,3.0,3.0]), (1.0,[1.0,1.0,3.0,3.0,3.0,1.0]), (1.0,[2.0,2.0,3.0,3.0,3.0,2.0]), (1.0,[3.0,3.0,2.0,3.0,3.0,3.0]), (1.0,[3.0,3.0,3.0,2.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,3.0,2.0,3.0]), (1.0,[3.0,3.0,2.0,2.0,3.0,3.0]))

接着我们划分一下训练数据和测试数据,将parsedData的60%分为训练数据,40%分为测试数据。

val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)

val trainingData = splits(0)

val testData = splits(1)

训练数据和测试数据也可以像上面一样,使用take()者count()查看。

激动人心的时刻,我们现在开始使用Spark的LogisticRegressioinWithLBFGS()来训练模型。设置好分类个数,这里是2个(破产和非破产):

val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)

当模型训练完,我们可以使用testData来检验一下模型的出错率。

val labelAndPreds = testData.map { point =

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testData.count

变量labelAndPreds保存了map()转换操作,map()将每一个行转换成二元组。二元组包含了testData的标签数据(point.label,分类数据)和预测出来的分类数据(prediction)。模型使用point.features作为输入数据。

最后一行代码,我们使用filter()转换操作和count()动作操作来计算模型出错率。filter()中,保留预测分类和所属分类不一致的元组。在 Scala中_1和_2可以用来访问元组的第一个元素和第二个元素。最后用预测出错的数量除以testData训练集的数量,我们可以得到模型出错率:

trainErr: Double = 0.20430107526881722

总结

在这个教程中,你已经看到了Apache Spark可以用于机器学习的任务,如logistic regression。虽然这只是非分布式的单机环境的Scala shell demo,但是Spark的真正强大在于分布式下的内存并行处理能力。

在大数据领域,Spark是目前最活跃的开源项目,在过去几年已迅速获得关注和发展。在过去的几年里。采访了超过2100受访者,各种各样的使用情况和环境。

[参考资料]

“Learning Spark” by HoldenKarau, Andy Konwinski, Patrick Wendell and Matei Zaharia, O’Reilly Media 2015

Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science

https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression

https://spark.apache.org/docs/1.1.0/mllib-data-types.html

https://archive.ics.uci.edu/ml/datasets/Qualitative_Bankruptcy

https://databricks.com/blog/2015/01/27/big-data-projects-are-hungry-for-simpler-and-more-powerful-tools-survey-validates-apache-spark-is-gaining-developer-traction.html

原文来自:LOGISTIC REGRESSION USING APACHE SPARK(译者/施聪羽 审校/朱正贵 责编/仲浩) 

关于译者: 施聪羽,浩渺科技服务端研发工程师,修炼中的码农。

原文发布于微信公众号 - CSDN技术头条(CSDN_Tech)

原文发表时间:2015-07-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏极客编程

Apache spark 的一些浅见。

分布并行计算和几个人一起搬砖的意思是一致的,一个资源密集型的任务(搬砖或计算),需要 一组资源(小伙伴或计算节点),并行地完成:

1332
来自专栏pydata

Matlab C混合编程

在MATLAB中可调用的C或Fortran语言程序称为MEX文件。MATLAB可以直接把MEX文件视为它的内建函数进行调用。MEX文件是动态链接的子例程,MAT...

1022
来自专栏fangyangcoder

基于交通灯数据集的端到端分类

抓住11月的尾巴,这里写上昨天做的一个DL的作业吧,作业很简单,基于交通灯的图像分类,但这确是让你从0构建深度学习系统的好例子,很多已有的数据集都封装好了,直接...

3243
来自专栏专知

【干货】TensorFlow中那些鲜为人知却又极其实用的知识

TensorFlow的生态圈极其强大,覆盖了科研、工程中的各种流程,其中一些特别好用的模块和技巧可以使你的工作效率大幅度提升,也可以让你的产品变得非常稳定。本文...

1461
来自专栏听雨堂

Pandas对行情数据的预处理

库里是过去抓取的行情数据,间隔6秒,每分钟8-10个数据不等,还有开盘前后的一些数据,用Pandas可以更加优雅地进行处理。 ? 需要把当前时间设置为index...

21810
来自专栏图形学与OpenGL

Endnote的Output Style文件

我使用Endnote已经有一些日子,苦于Endnote无法输出符合中文期刊要求的参考文献格式,用得不多. 这次写论文想试一下,便在研究英文Output Styl...

1542
来自专栏恰同学骚年

设计模式的征途—8.桥接(Bridge)模式

在现实生活中,我们常常会用到两种或多种类型的笔,比如毛笔和蜡笔。假设我们需要大、中、小三种类型的画笔来绘制12中不同的颜色,如果我们使用蜡笔,需要准备3*12=...

1323
来自专栏青玉伏案

算法与数据结构(六) 迪杰斯特拉算法的最短路径(Swift版)

上篇博客我们详细的介绍了两种经典的最小生成树的算法,本篇博客我们就来详细的讲一下最短路径的经典算法----迪杰斯特拉算法。首先我们先聊一下什么是最短路径,这个还...

2485
来自专栏图形学与OpenGL

机械版CG 实验1 像素点的生成

注:本博客实验教程的配套教材为《计算机图形学》(徐文鹏编)已由机械工业出版社于2009年2月出版。

923
来自专栏企鹅号快讯

从图灵机开始

说到图灵机,我们首先要说说图灵这个人。笔者觉得我们这种搞计算机的人都应该知道并记得这个人。 图灵,1912年6月23日生于英国帕丁顿。是数学家、密码破译专家,当...

2128

扫码关注云+社区

领取腾讯云代金券