Apache Spark:大数据时代的终极解决方案

Apache Spark是基于Hadoop MapReduce的数据分析引擎,它有助于快速处理大数据。它克服了Hadoop的限制,正在成为最流行的大数据分析框架。

原文作者:Jitendra Bhatia

原文地址:https://opensourceforu.com/2017/01/apache-spark-the-ultimate-panacea-for-the-big-data-era/

随着新技术的出现,社交媒体、网络日志、物联网等各种数据源产生的数据正在以PB级增长。传统的算法和存储系统并不足以应对如此庞大的数据量,因此,我们有必要高效的解决这个问题。

Apache Spark引擎简介

Apache Spark是基于Apache Hadoop构建的集群计算框架。它扩展了MapReduce模型,并且允许在内存中直接快速处理大量数据。它具有容错性和数据并行功能,同时也支持许多库,如GraphX(用于图形处理),MLlib(用于机器学习)等。这些功能使Spark成为大数据分析最流行的平台。Spark的使用者包括eBay、Amazon和Yahoo等科技巨头,这些都预示着了Spark的未来具有光明的前景。

Spark由加州大学伯克利分校的AMP实验室(Algorithms, Machines and People Lab)的Matei Zaharia于2009年创建,作为轻量级的快速集群计算框架。2010年,Spark以BSD许可的形式被捐赠给Apache软件基金会,自此,全世界的开发者都为Spark做了贡献。2014年11月,Zaharia(即前文提到的Spark作者)的企业Databricks通过使用Spark引擎以打破了大型数据集排序时间的世界纪录。Spark 2.0.0是2016年7月26日发布的最新版本。(译者注:当前Spark版本已经是2.3.0,后面的安装部分的命令和代码也会有一些差异)

Hadoop由于其可扩展性、灵活性和MapReduce模型而得到了广泛应用,但Spark得益于内存计算速度比Hadoop快100倍,磁盘计算速度也提高了10倍,因此Hadoop在和Spark的竞争中逐渐力不从心。在Hadoop中,数据存储在磁盘上,而在Spark中则存储在内存中,这可以极大地降低IO成本。Hadoop的MapReduce只能通过将数据写入外部存储并在需要时再次通过IO获取数据来重用数据。迭代型和交互式作业需要快速响应,但由于数据的复制、磁盘IO和序列化,MapReduce的性能并不令人满意。Spark的独特之处在于它使用了RDD(弹性分布式数据集,Resilient Distributed Dataset),因而Spark比经常复制数据的Hadoop具有更好的容错能力。虽然Spark是从Hadoop派生的,但Spark不是Hadoop的一个修改版本。Hadoop是实现Spark的基础方法,Spark有自己的集群管理系统,可以独立运行(standalone模式),因此Hadoop并不是Spark运行所必须的。从内部实现看,Hadoop仅仅给Spark提供了两个函数——一个是通过MapReduce进行处理,另一个是使用Hadoop分布式文件系统(HDFS)进行存储。由于二者之间并不相互排斥,因此Spark不会取代Hadoop。相反,它们相辅相成,形成了一个非常强大的模型。

图1:Spark引擎的体系结构

Apache Spark的力量

速度:Spark在内存中直接进行数据的集群化处理,这意味着它减少了迭代算法的I/O操作,之前生成的中间数据直接存储内存中,而不需要将其回写到磁盘。数据可以存储在服务器机器的RAM中,因此,与Hadoop相比,它在内存中运行速度提高了100倍,磁盘操作运行速度提高了10倍。而且,由于其自下而上的工程设计和RDD的使用,Spark的基本数据结构允许在内存中将数据“透明存储”,并且仅在需要时才将其存储到磁盘。“懒惰运算”(Lazy evaluation)是Spark的另一个特征,引擎会延迟对任何表达式和操作的运算,直到另一个表达式需要该结果值,从而有助于Spark的速度。这避免了对同一表达式的重复运算,并允许定义控制流和潜在的无限集。

库:除了简单的MapReduce功能,Spark还配备了标准的内置高级库,包括SQL查询(SparkSQL)、机器学习(MLlib)以及流式数据和图形处理(GraphX)的兼容性。Spark让开发人员以更少的代码量调用这些功能,这些功能在提高了开发人员的生产力的同时,也可以创建复杂的工作流。Spark与实时处理应用程序兼容。

多语言:当使用熟悉的语言时,开发人员编写代码更具优势。因此,Spark为Java、Scala、Python、R和SQL都提供了稳定的API。Spark SQL组件允许导入结构化数据并将其与其他来源的非结构化数据相整合。Spark拥有超过100个高阶操作,除了简单的MapReduce功能,Spark还配备了标准的内置高级库,包括SQL查询(SparkSQL)、机器学习(MLlib)以及流式数据和图形处理(GraphX)的兼容性。它可以用于实时处理应用程序,其方法是将转换应用于半结构化数据,并允许在Spark shell中进行交互式查询。这种动态特性使Spark比Hadoop更受欢迎。

支持Hadoop:大数据和云是协同作用的,Spark对云技术的支持是其最大优势之一。它与HDFS、Apache Cassandra、Apache HBase、Apache Mesos和Amazon S3等广泛使用的大数据框架兼容。Spark没有自己的存储系统,通过三种可能的方式实现对Hadoop堆栈的强化:1)独立模式,2)通过YARN,3)SIMR(在MapReduce中的Spark,Spark in MapReduce)。它还可以支持现有的纯Hadoop生态系统。

MapReduce的替代方法: Spark可以用来代替MapReduce,因为它可以在短时间内执行作业,而且只需5秒或更短的时间。与基于Hadoop的框架(如Twitter Storm)进行实时处理相比,Spark框架在批处理和迭代算法上更快。

在Ubuntu上配置Apache Spark

在Ubuntu上安装和配置Apache Spark非常简单。本地Linux系统是首选的安装方式,因为它提供了最佳的部署环境。当然,也可以使用虚拟操作系统,但与原生版本相比,在性能上会受到一些影响。双操作系统也是非常好的选择。可以选择使用独立版本或使用为Hadoop预先构建的版本,该版本利用现有的Hadoop组件(如HDFS)或构建在YARN上的版本。以下部分将介绍如何在Ubuntu 14.04或更高版本上安装单机模式的Spark 2.0.0。(译者注:这个版本的Spark和Java都不是最新版本,推荐读者安装更高版本的Spark)

安装Java:要安装和配置Spark,您的机器需要先安装Java。可以在终端中使用以下命令来自动下载和更新Java:

$ sudo apt-add-repository ppa:webupd8team / java  
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer

您可以输入以下内容来检查现有版本:

$ java -version
图2:Spark引擎的几个可行部署方案

安装Scala: Spark是用Scala编写的; 所以我们需要Scala环境来安装Spark。从http://www.scala-lang.org/可以下载2.10.4或更高版本,并使用以下命令解压该文件:

$ sudo tar xvf scala-2.10.4.tgz

下面,在.bashrc文件中添加一个Scala条目,如下所示:

$ nano ~/.bashrc

在文件末尾,将Scala文件的路径添加到环境变量:

export SCALA_HOME=<解压后的scala的路径> 
export PATH=$SCALA_HOME/bin:$ PATH

然后我们需要使用下面给出的命令,令已更改的.bashrc文件使配置的环境变量生效:

$ source ~/.bashrc

我们可以使用以下命令验证Scala安装:

$ scala -version

安装Spark:首先,从Spark 的官方网站http://spark.apache.org/downloads.html下载Spark的独立集群版(standalone version)。 然后通过在终端中键入以下命令来提取文件:

$ tar xvf spark-2.0.0-bin-hadoop2.6.tgz

通过nano修改.bashrc

$ nano ~/.bashrc 

通过以下命令将指定位置的行添加到~/.bashrc

(译者注:spana为原作者的用户名,修改为自己的即可)

export SPARK_HOME=/home/sapna/spark-2.0.0-bin-hadoop2.6/
export PATH=$ PATH$ SPARK_HOME/bin

然后我们需要使用下面给出的命令,令已更改的.bashrc文件使配置的环境变量生效:

$ source ~/.bashrc

启动Spark服务和shell。然后,进入Spark文件夹并使用下面的命令手动启动主集群:

 $ cd spark-2.0.0-bin-hadoop2.6
 $ ./sbin/start-master.sh 运行此操作后,您可以通过在浏览器中键入以下命令来查看主节点的用户界面:http://localhost:8080

您可以通过以下命令启动从节点:

$ ./sbin/start-slave.sh <要启动的从节点name>

要检查节点是否正在运行,请执行以下操作:

$ Jps

Apache Spark引擎的体系结构

Spark使用主/从(master/worker)结构。Spark Context对象是程序执行的入口,它与master进行协调式的交互管理,而master管理着worker执行的工作。

Spark基于两个主要概念 - RDD(弹性分布式数据集)和DAG(有向无环图)执行引擎。RDD是一个只读的不可变对象集合,是Spark的基本数据结构。RDD的数据分块存储,每个RDD可以在不同的节点上计算,并且可以用多种语言编程。在工作时,它将内存的状态作为对象存储,并且对象可以在作业之间共享。RDD可以通过映射(map)或过滤(filter)来转换数据,也可以执行运算并返回值。RDD可以并行化,并且本质上是容错的。可以通过两种方法创建它们 - 通过在应用程序中获取现有集合并通过Spark Context将其并行化或通过从HDFS,HBase,AWS等外部存储系统中创建引用。有向无环图(DAG)有助于消除MapReduce的多阶段模型,因而提供了数据处理上的优势。

Spark可以通过三种流行的方式进行部署,以迎合不同的场景。第一种方法是使用独立模式。在该模式下,Spark放置在HDFS上方并手动为其分配内存。集群上的所有Spark作业都是在Spark和MapReduce同时运行的情况下执行的。第二种方法是使用Hadoop YARN(另一个资源管理器,Yet Another Resource Manager)等集群管理系统,该系统不需要任何预安装或root访问权限即可与Hadoop堆栈或Hadoop生态系统相集成;其他组件可以从外部添加到其架构顶部,以增加整个系统的功能。第三种方法是使用SIMR(Spark In MapReduce),除了管理功能外,它还可以执行Spark作业。Spark shell可以在没有任何管理员权限的情况下使用。

构成Spark的主要元素是:Spark Core,MLlib,GraphX,Spark Streaming和Spark SQL。Spark Core是基础平台引擎,可以作为构建其他功能的基础。Spark SQL组件在次基础上提供了SchemaRDD的抽象类,它允许加载、分析和处理半结构化和结构化的数据集。Spark Streaming允许实时流式传输和分析以小批量方式(mini-batch)加载到RDD中的数据。MLlib是一个大型库,用在大数据集上实现机器学习方法,是由来自世界各地的程序员建造的。GraphX是一个分布式图形处理框架,它提供了一个表示图形计算的API,该API可以使用Pregel抽象API对用户定义的图形进行建模。此外,GraphX包含越来越多的图形算法和构建器,以优化图形分析任务。Spark应用程序独立运行在由驱动程序中的SparkContext对象管理的一组集群上。SparkContext实例可以与Mesos或YARN等管理器连接,并将资源分配给不同的商用硬件,以获得最佳性能。分配后,每个作业的执行者会收到用于执行作业的应用程序代码及其任务。每个Spark应用程序都有自己的可多线程的执行程序。数据需要存储在不同的Spark应用程序的外部存储中以便共享。Spark应用程序独立运行在由驱动程序中的SparkContext对象管理的一组集群上。SparkContext实例可以与Mesos或YARN等管理器连接,并可以将资源分配给不同的商品机器以获得最佳性能。分配后,每个作业的执行者会收到用于执行作业的应用程序代码和任务。每个Spark应用程序都有自己的可执行多线程的执行程序。数据需要存储在不同的Spark应用程序的外部存储中以便共享。Spark应用程序独立运行在由驱动程序中的SparkContext对象管理的一组集群上。SparkContext实例可以与Mesos或YARN等管理器连接,并可以将资源分配给不同的商品机器以获得最佳性能。分配后,每个作业的执行者会收到用于执行作业的应用程序代码和任务。每个Spark应用程序都有自己的可多线程运行执行程序。因此,为了方便共享,数据需要存储在不同的Spark应用程序的外部存储中。

图3:Spark引擎的内部架构

Apache Spark引擎初探

以下部分将探讨如何启动Spark引擎及其服务。下面将演示如何执行现有程序,如何启动客户端、服务器以及如何启动Spark Shell。

启动Spark服务和shell

首先,进入Spark的文件夹,并使用以下命令手动启动主集群:

cd spark-2.0.0-bin-hadoop2.6
./sbin/start-master.sh

运行此操作后,您可以通过在浏览器中键入以下命令来查看主节点的用户界面:

http://localhost:8080

您可以使用以下命令启动从节点:

./sbin/start-slave.sh <要运行的从节点的名称>

要检查节点是否正在运行,请执行以下操作:

jps

运行Spark Shell

您可以使用以下命令运行Scala的Spark Shell:

(译者注:由于之前配置了环境变量,可以在终端直接输入spark-shellpyspark命令,即可启动Spark Shell)

$ bin/spark-shell

您可以使用以下命令运行Python的Spark shell:

$ bin/pyspark

Spark运行一个现有的程序

首先,我们可以编译一个包含程序代码的文件,该程序稍后将在Spark中运行:

$ scalac -classpath "spark-core_2.10-2.0.0.jar:/usr/local/spark/lib/spark-assembly-2.0.0-hadoop2.6.0.jar" <file name>

然后,可以通过编译的文件创建一个JAR文件,以wordcount程序为例,如下所示:

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

现在,将JAR文件提交给Spark,以运行该应用程序,如下所示:

$ spark-submit --class <application name> --master local <jar file name>

在Apache Spark引擎中编写和执行基本脚本

我们已经学习了如何启动Shell,如何创建和编译JAR文件并提交任务。现在让我们在Scala中编写并执行一个简单的WordCount示例,以便部署到Spark上。

首先,从下面给出的句子中创建一个简单的input.txt文件,并将其放入包含所有其他jar文件和程序代码的Spark应用程序文件夹中:

This is my first small word count program using Spark. I will use a simple MapReduce program made in Scala to count the frequency of each word.

(这是我第一个使用Spark的小字数计数程序。我将使用一个在Scala中制作的简单MapReduce程序来计算每个单词的频率。)

接下来,打开Spark shell:

$ spark-shell

然后建立一个RDD,它将从我们的input.txt文件中读取数据。sc是一个SparkContext对象,它是所有RDD的管理器:

scala> val inputfile = sc.textFile(“input.txt”)

我们通过将每行分成单独的单词的方法来进行数据转换。在之前的文本中,每一行是一个实体,但现在,我们需要让每个词都是一个实体,再对词粒度进行统计。接下来,让我们计算每个单词出现的次数。可以通过其键(Key)将相同Key的实例合并,然后将其频次相加,以对每个不同单词出现的次数进行计数。

scala> val counts = inputfile.flatMap(line => line.split(“ “)).map(word => (word, 1)).reduceByKey(_+_);

我们可以缓存输出以保持它,如下所示:

scala> counts.cache()

或者我们可以将它存储到外部文本文件中,如下所示:(文件名为output)

scala> counts.saveAsTextFile(“output”)

我们可以检查输出如下:

$ cd output/ 
$ ls -1

使用下面的命令在Spark屏幕上打印输出:

$ cat part-00000
$ cat part-00001

使用Apache Spark引擎分析大数据

随着技术的进步,Web服务器、机器的日志文件、物联网、社交媒体、用户点击、网络流媒体等,每天都会产生PB级的数据,其中大部分是半结构化或非结构化的。这种大数据的特点是速度快、体积大、可变性高;因此,传统算法和处理技术无法应对。MapReduce能够使用商用硬件集群令人满意地处理这些数据。但由于前面提到的原因,日益增多的数据量正在超越MapReduce的处理能力。因此,Spark作为解决MapReduce限制的方案,被设计了出来。它为共享数据和内存计算提供了内存抽象,而RDD也可以被保留下来并重新用于其他计算。Spark的多平台支持、与Hadoop的集成能力以及它与云的兼容性使它成为为大数据量身定做的解决方案。

在现实世界中,Spark被用于许多应用程序。银行分析来自社交媒体、电子邮件、投诉日志、通话记录等来源的大量数据,以获取信用风险评估、客户细分或有定向广告方面的信息,甚至信用卡欺诈都可以通过它来检查。电子商务网站使用流式聚类算法来分析实时交易来进行广告宣传,或者通过获取来对论坛、评论、社交媒体的洞察力向顾客推荐产品。如Shopify、阿里巴巴和eBay都使用了这些技术。由于Spark能够快速诊断并过滤出具有健康风险状态的个人,医疗行业可从Spark数据分析中受益。MyFitnessPal使用Spark来处理其所有活动用户的数据。生物医学方面,由于数百万条染色体链必须匹配,因此Spark被广泛用于基因组测序和DNA分析;这项任务之前需要数周时间,但现在只需数小时。娱乐行业(如Pinterest,Netflix和雅虎新闻)也将Spark用于个性化和推荐系统。

使用Apache Spark引擎进行大数据处理

让我们来看看一个适合初学者学习的可以处理大数据的简洁的应用程序。让我们加载美国流行电视节目“Five Thirty Eight”的数据集,并执行简单的聚合功能。可以通过https://github.com/fivethirtyeight/data/blob/master/daily-show-guests/下载过去50年的数据daily_show_guests.csv

创建一个RDD,读取数据并使用以下代码打印前五行。

raw_data = sc.textFile("daily_show_guests.csv")
raw_data.take(5)

然后,使用map函数分割所有单词,如下所示:

daily_show = raw_data.map(lambda line: line.split(‘,’))daily_show.take(5)

接下来,定义一个代码段来计算每年访客的次数,如下所示:

(译者注:该代码在第二行的for循环处会报错,可以将daily_show改为daily_show.toLocalIterator(),使RDD转为可迭代的数据结构)

tally = dict()
for line in daily_show:
    year = line[0]  
    if year in tally.keys():
        tally[year] = tally[year] + 1
    else:
        tally[year] = 1

通过使用Reduce转换来执行该函数,如下所示:

tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print(tally)
tally.take(tally.count())

现在使用一个过滤器函数,根据职业进行隔离,从现有的RDD创建一个RDD:

def filter_year(line):
    if line[0] == ‘YEAR’:
        return False
    else:
        return True
filtered_daily_show = daily_show.filter(lambda line: filter_year(line))

现在,通过执行reduce变换来执行此过滤器:

filtered_daily_show.filter(lambda line: line[1] != ‘’) \
                        .map(lambda line: (line[1].lower(), 1))\
                        .reduceByKey(lambda x,y: x+y) \
                        .take(5)

这样就完成了大数据领域最有前途的技术之一的概述。Spark的特性和体系结构使其在诸如Hadoop等流行框架面前具有优势。Spark可以在Hadoop上实现,并且由于两种技术协同使用,整体效率也会提高。由于其多种集成和适配器,Spark也可以与其他技术结合使用。例如,我们可以同时使用Spark,Kafka和Apache Cassandra —— Kafka可用于流式数据传输,Spark用于计算,Cassandra NoSQL数据库用于存储结果数据。但是,Spark仍在进一步开发中,它还是一个相对不太成熟的生态系统,有很多领域需要改进,比如安全和业务集成工具。不过,Spark将在很长一段时间内继续在此停留。

本文的版权归 不高不富不帅的陈政_ 所有,如需转载请联系作者。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我是攻城师

Pig0.15集成Tez,让猪飞起来

39460
来自专栏数据科学与人工智能

【Hadoop研究】YARN:下一代 Hadoop计算平台

Apache Hadoop 是最流行的大数据处理工具之一。它多年来被许多公司成功部署在生产中。尽管 Hadoop 被视为可靠的、可扩展的、富有成本效益的解决方案...

31660
来自专栏重庆的技术分享区

Hadoop和大数据分析简介

原文地址:https://opensourceforu.com/2013/12/introduction-tohadoop-big-data-analysis/

32140
来自专栏灯塔大数据

塔说 | 常见Hadoop面试题及答案解析

导读:Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和...

38550
来自专栏Albert陈凯

YARN & Mesos,论集群资源管理所面临的挑战

在国内,大部分的Spark用户都是由Hadoop过渡而来,因此YARN也成了大多Spark应用的底层资源调度保障。而随着Spark应用的逐渐加深,各种问题也随之...

31350
来自专栏数据派THU

手把手教你入门Hadoop(附代码资源)

作者:GETINDATA公司创始人兼大数据顾问彼得亚·雷克鲁斯基(Piotr Krewski)和GETINDATA公司首席执行官兼创始人亚当·卡瓦(Adam K...

16440
来自专栏数据科学与人工智能

【Spark研究】用Apache Spark进行大数据处理之入门介绍

什么是Spark Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于20...

30390
来自专栏包子铺里聊IT

五分钟深入 Hadoop 输入优化

当面试公司问起 Hadoop 经验时,我们当然不能只停留在 Mapper 干了什么、Reducer 干了什么。没有 Performance Tuning 怎么...

27570
来自专栏斑斓

【大数据】Spark的硬件配置

从MapReduce的兴起,就带来一种思路,就是希望通过大量廉价的机器来处理以前需要耗费昂贵资源的海量数据。这种方式事实上是一种架构的水平伸缩模式——真正的以量...

60050
来自专栏加米谷大数据

Spark核心谈

在大数据领域,Spark平台因计算模型涵盖MapReduce,Streaming,SQL,Machine Learning,Graph等,为大数据计算提供一栈式...

15510

扫码关注云+社区

领取腾讯云代金券