作者:Kumar Chinnakali
译者:java达人
来源:http://dataottam.com/2016/01/10/self-learn-yourself-apache-spark-in-21-blogs-3/(点击文末阅读原文前往)
一、
Spark项目最初由加州大学伯克利分校AMP实验室的Matei在2009年发起,并在2010年根据BSD协议开源。2013年,该项目捐献给Apache软件基金会,转为Apache2.0 协议。2014年二月,Spark成为Apache重点项目。2014年11月,Databricks的工程团队通过使用Spark,在大规模分类领域刷新了世界纪录。 而Apache Spark的最新版本是1.6,具有许多新特性(译者:目前是2.2.0)。
Spark系统背后的许多思想都从各种研究论文中孕育产生的。
那么Spark如何与Hadoop关联,Spark是与Hadoop数据兼容的快速通用处理引擎,可以通过YARN或Spark的独立模式在Hadoop集群中运行。 它可以处理HDFS,HBase,Cassandra,Hive及任何Hadoop 输入格式的数据。 它旨在执行类似于MapReduce的批处理和其他新的工作任务,如流处理,交互式查询和机器学习。
但是在Apache Spark之前,我们如何解决大数据问题,使用了哪些工具。 我们必须使用20多种工具在生产环境部署大数据应用程序。
Apache Spark,一个统一的大数据平台,如何帮助解决大数据问题。
Apache Spark最适合跨越平台,数据源,应用程序和用户的并行内存处理。 Apache Spark还有少量在OLAP Analytics, Operational Analytics, Complex Data Pipelining等方面的用例。
Apache有许多组件,包括Spark Core,负责任务调度、内存管理、故障恢复,并与存储系统交互。
SparkSQL > Structured Data > Querying with SQL/HQL
Spark Streaming > Processing of live streams > Micro batching
MLlib > Machine Learning > Multiple types of ML algorithms
GraphX > Graph Processing >Graph Parallel computations
现在我们已经清楚Apache Spark的理论知识,现在通过动手实战,开始我们的游戏。 第一步是让Apache Spark环境启动运行起来。(译者:以下为在AWS建立Spark集群的操作,选读)
下列步骤创建了一个安装有Spark的集群。
在云上搭建Apache Spark环境后,我们准备开发Spark大数据应用程序。在开始构建Spark应用程序之前,我们来看看可用于开发Apache Spark应用程序的语言。它提供多种API,如Scala,Hive,R,Python,Java和Pig。
Scala - 这是用来开发Apache Spark本身的语言。Scala设计初衷是实现可伸缩语言。
Java - 用于开发许多大数据Spark应用程序。Spark甚至支持Java 8。
Python - Spark还支持Python API,通过它,许多MLlib应用程是用它开发的。
R - 从Spark 1.4版本开始,Apache Spark支持R API,这是许多数据科学家使用的主要统计语言。
可见,在Apache Spark大数据谱系中,使用了很多语言。
Hello World,Apache Spark的粉丝!将首先动手实践。
Spark带有交互式shell,称为REPL - 读取,计算,打印和循环。在REPL Spark的帮助下,可以在大数据中进行交互式查询。它有助于快速和交互地构建代码。
现在让我们给出以下命令,
C:\ Users \ dataottam> spark-shell
Scala>
首先要注意的是,Spark shell为你创建了两个值,一个是sc,另一个是sqlcontext。Sqlcontext用于执行Spark SQL库中的程序。而Sc是Spark Context,它是Spark应用程序的核心引擎。所有的Spark job都起始于sc的创建,它用于控制分布式应用程序
上述命令用于为README.md文件创建RDD。一旦我们立即触发上述命令,我们将为该文件创建RDD。RDD是Spark的基本抽象。RDD表示弹性分布式数据集。
Spark核心操作分为两种,即转化和行动。转化是惰性计算;而行动是在执行时就计算结果。
Apache Spark有许多优势,如果它不是惰性计算,那么我们将加载整个文件,而这是不必要的,惰性计算提升了Spark的性能。
上述命令是Apache Spark单词计数程序。
嘿,亲爱的朋友,在深入理解之前,我们了解下Spark Core维护者--Mate Core,Reynold,Patrick和Josh,他们是Spark Core的核心开发者。现在我们来讨论一下RDD的Apache Spark的核心方法。它有两种类型的功能,数据转化操作和数据行动操作。
先了解Spark的内部工作原理。所有Apache Spark应用程序和系统都通过驱动器节点管理。而驱动器节点是根据标记和配置的对工作节点进行管理。在驱动程序中,任何应用程序都在SparkContext中启动。并且所有的Spark应用程序都围绕着这个核心驱动程序和SparkContext进行构建。Driver/ SparkContext的重要的任务实体是Task Creator, Data locality, Scheduler, 还有Fault tolerance。虽然我们能够在同一个处理器中创建多个SparkContext,但基于最佳实践和拇指规则,我们不应该在处理器中创建多个SparkContext。SparkContext表示为sc更简洁,易于使用。
代替命令行操作,请查看Spark word count程序。
一旦我们准备好jar包,那么我们可以如下方式提交我们的应用程序,
现在我们来了解下RDD。RDD是分配在集群中多个节点的可以并行操作的元素集合。RDD即是弹性分布式数据集。RDD是在构建时考虑到了失败,所以如果一个失败,其他的将会计算给出结果。这导致Apache Spark中的大部分方法都是惰性的。指令以DAG(有向无环图)的形式存储供以后使用。这些DAG将继续变化,并提供map, filter等转化操作,这些操作都是惰性计算的。在Apache Spark中,失败被正常处理。惰性操作很棒,但是我们需要像collect, count, 和reduce等操作来触发DAG执行,并计算出结果值,然后它将值返回给驱动程序,或者持久化存储。我们再介绍下RDD的一个知识点,RDD是不可变的,即它一旦被创建,我们就不能再改变它了。
在基本的RDD(弹性分布式数据集),如果内存中的数据丢失,可以重新创建,跨越Spark集群存储在内存中,初始数据来自文件或通过编程方式创建。 RDD是Spark数据基本单位,大部分的Spark编程工作包含了一系列的RDD操作。
我们有三种方法创建RDD,
从一个文件或一组文件创建 从内存数据创建 从另一个RDD创建 以下是基于文件RDD的代码片段,我们使用SparkContext对象来创建。 它接受一个文件,如果我们想要接收文件列表,那么我们就要使用通配符表示的或逗号分隔的文件列表来创建。
SparkContext.textFile(“dataottamfile.txt”)或sc.textFile(“dataottafile.txt”)
SparkContext.textFile(dataottam / * . log)或sc.textFile(“dataottam / * /日志”)
SparkContext.textFile(“dataottam1。 txt,dataottam2.txt”)或sc.textFile(“dataottam1。 txt,dataottam2.txt”)
请注意文件中的每一行都是RDD中的独立记录而且每一个文件都被绝对或相对路径引用。
以下是基于文件RDD的快照,
dataottamRDD = sc.textFile(“dataottam.txt”)
count()
RDD有两种类型的操作;
1、行动-返回值
一些常见的操作 count(), take(n), collect(), saveAsTextFile(file), first(), foreach(), reduce()。
2、 转换 - 根据当前的RDD定义新的RDD。而转换可以链接在一起。
几个常见的转化是map(func), filter(), flatMap(), sample(), union(), distinct(), join()
并且这些RDD并不真正处理,直到行为操作触发,其中许多RDD操作需要传递函数参数进行计算。
Apache Spark可以从任何输入源如HDFS,S3,Casandra,RDBMS,Parquet,Avro,以及内存中加载数据。我们来看看我们如何在命令行中使用它,
内存加载方式
parallelizemakeRDD
range
外部加载方式
TextFileswholeTextFiles
sequenceFile
objectFile
hadoopFile
newAPIHadoopFile
hadoopRDDFile
hadoopRDD
现在让我们讨论一下什么是Lambdas表达式,这在以上几个例子中已经使用过。而这在以后的例子中也是如此。lambda表达式也称为匿名函数。下面就是Lambda表达式,
rdd.flatMap(line => line.split(“”))
现在展示如何将命名方法转换为lambda表达式,
def addOne(item: Int) = {
item+1}
Val intList = List(1,2)
For(item <- intList) yield {
addOne(item)
}Lambda:def addOne(item: Int) = {
item+1}
Val intList = List(1,2)
intList.map(X => {
addOne(x)
})
我们再微调一下:
Val intList = List(1,2)
intList.map(item => item+1)
转化和行动操作: