Apache Spark大数据分析入门(一)

Apache Spark的出现让普通人也具备了大数据及实时数据分析能力。鉴于此,本文通过动手实战操作演示带领大家快速地入门学习Spark。本文是Apache Spark入门系列教程(共四部分)的第一部分。

全文共包括四个部分:

  • 第一部分:Spark入门,介绍如何使用Shell及RDDs
  • 第二部分:介绍Spark SQL、Dataframes及如何结合Spark与Cassandra一起使用
  • 第三部分:介绍Spark MLlib和Spark Streaming
  • 第四部分:介绍Spark Graphx图计算

本篇讲解的便是第一部分

关于全部摘要和提纲部分,请登录我们的网站 Apache Spark QuickStart for real-time data-analytics进行访问。

在网站上你可以找到更多这方面的文章和教程,例如: Java Reactive Microservice Training, Microservices Architecture | Consul Service Discovery and Health For Microservices Architecture Tutorial。还有更多的其它内容,感兴趣的可以去查看。

Spark 概述

Apache Spark是一个正在快速成长的开源集群计算系统,正在快速的成长。Apache Spark生态系统中的包和框架日益丰富,使得Spark能够进行高级数据分析。Apache Spark的快速成功得益于它的强大功能和易于使用性。相比于传统的MapReduce大数据分析,Spark效率更高、运行时速度更快。Apache Spark 提供了内存中的分布式计算能力,具有Java、 Scala、Python、R四种编程语言的API编程接口。Spark生态系统如下图所示:

整个生态系统构建在Spark内核引擎之上,内核使得Spark具备快速的内存计算能力,也使得其API支持Java、Scala,、Python、R四种编程语言。Streaming具备实时流数据的处理能力。Spark SQL使得用户使用他们最擅长的语言查询结构化数据,DataFrame位于Spark SQL的核心,DataFrame将数据保存为行的集合,对应行中的各列都被命名,通过使用DataFrame,可以非常方便地查询、绘制和过滤数据。MLlib为Spark中的机器学习框架。Graphx为图计算框架,提供结构化数据的图计算能力。以上便是整个生态系统的概况。

Apache Spark的发展历史

  • 最初由加州伯克利大学(UC Berkeley) AMP lab实验室开发并于2010年开源,目前已经成为阿帕奇软件基金会(Apache Software Foundation)的顶级项目。
  • 已经有12,500次代码提交,这些提交来自630个源码贡献者(参见 Apache Spark Github repo)
  • 大部分代码使用 Scala语言编写。
  • Apache Spark的Google兴趣搜索量( Google search interests)最近呈井喷式的增长,这表明其关注度之高(Google广告词工具显示:仅七月就有多达108,000次搜索,比Microservices的搜索量多十倍)
  • 部分Spark的源码贡献者(distributors)分别来自IBM、Oracle、DataStax、BlueData、Cloudera……
  • 构建在Spark上的应用包括:Qlik、Talen、Tresata、atscale、platfora……
  • 使用Spark的公司有: Verizon Verizon、 NBC、Yahoo、 Spotify……

大家对Apache Spark如此感兴趣的原因是它使得普通的开发具备Hadoop的数据处理能力。较之于Hadoop,Spark的集群配置比Hadoop集群的配置更简单,运行速度更快且更容易编程。Spark使得大多数的开发人员具备了大数据和实时数据分析能力。鉴于此,鉴于此,本文通过动手实战操作演示带领大家快速地入门学习Apache Spark。

下载Spark并河演示如何使用交互式Shell命令行

动手实验Apache Spark的最好方式是使用交互式Shell命令行,Spark目前有Python Shell和Scala Shell两种交互式命令行。

可以从 这里下载Apache Spark,下载时选择最近预编译好的版本以便能够立即运行shell。

目前最新的Apache Spark版本是1.5.0,发布时间是2015年9月9日。

tar -xvzf ~/spark-1.5.0-bin-hadoop2.4.tgz

运行Python Shell

cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark

在本节中不会使用Python Shell进行演示。

Scala交互式命令行由于运行在JVM上,能够使用java库。

运行Scala Shell

cd spark-1.5.0-bin-hadoop2.4 ./bin/spark-shell

执行完上述命令行,你可以看到下列输出:

Scala Shell欢迎信息

Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.5.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25) Type in expressions to have them evaluated. Type :help for more information. 15/08/24 21:58:29 INFO SparkContext: Running Spark version 1.5.0

下面是一些简单的练习以便帮助使用shell。也许你现在不能理解我们做的是什么,但在后面我们会对此进行详细分析。在Scala Shell中,执行下列操作:

在Spark中使用README 文件创建textFileRDD

val textFile = sc.textFile("README.md")

获取textFile RDD的第一个元素

textFile.first() res3: String = # Apache Spark

对textFile RDD中的数据进行过滤操作,返回所有包含“Spark”关键字的行,操作完成后会返回一个新的RDD,操作完成后可以对返回的RDD的行进行计数

筛选出包括Spark关键字的RDD然后进行行计数

val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark.count() res10: Long = 19

要找出RDD linesWithSpark单词出现最多的行,可以使用下列操作。使用map方法,将RDD中的各行映射成一个数,然后再使用reduce方法找出包含单词数最多的行。

找出RDD textFile 中包含单词数最多的行

textFile.map(line => line.split(" ").size) .reduce((a, b) => if (a > b) a else b) res11: Int = 14

返回结果表明第14行单词数最多。

也可以引入其它java包,例如 Math.max()方法,因为map和reduce方法接受scala函数字面量作为参数。

在scala shell中引入Java方法

import java.lang.Math textFile.map(line => line.split(" ").size) .reduce((a, b) => Math.max(a, b)) res12: Int = 14

我们可以很容易地将数据缓存到内存当中。

将RDD linesWithSpark 缓存,然后进行行计数

linesWithSpark.cache() res13: linesWithSpark.type = MapPartitionsRDD[8] at filter at <console>:23 linesWithSpark.count() res15: Long = 19

上面简要地给大家演示的了如何使用Spark交互式命令行。

弹性分布式数据集(RDDs)

Spark在集群中可以并行地执行任务,并行度由Spark中的主要组件之一——RDD决定。弹性分布式数据集(Resilient distributed data, RDD)是一种数据表示方式,RDD中的数据被分区存储在集群中(碎片化的数据存储方式),正是由于数据的分区存储使得任务可以并行执行。分区数量越多,并行越高。下图给出了RDD的表示:

想像每列均为一个分区(partition ),你可以非常方便地将分区数据分配给集群中的各个节点。

为创建RDD,可以从外部存储中读取数据,例如从Cassandra、Amazon简单存储服务(Amazon Simple Storage Service)、HDFS或其它Hadoop支持的输入数据格式中读取。也可以通过读取文件、数组或JSON格式的数据来创建RDD。另一方面,如果对于应用来说,数据是本地化的,此时你仅需要使用parallelize方法便可以将Spark的特性作用于相应数据,并通过Apache Spark集群对数据进行并行化分析。为验证这一点,我们使用Scala Spark Shell进行演示:

通过单词列表集合创建RDD thingsRDD

val thingsRDD = sc.parallelize(List("spoon", "fork", "plate", "cup", "bottle")) thingsRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24

计算RDD thingsRDD 中单的个数

thingsRDD.count() res16: Long = 5

运行Spark时,需要创建Spark Context。使用Spark Shell交互式命令行时,Spark Context会自动创建。当调用Spark Context 对象的parallelize 方法后,我们会得到一个经过分区的RDD,这些数据将被分发到集群的各个节点上。

使用RDD我们能够做什么?

对RDD,既可以进行数据转换,也可以对进行action操作。这意味着使用transformation可以改变数据格式、进行数据查询或数据过滤操作等,使用action操作,可以触发数据的改变、抽取数据、收集数据甚至进行计数。

例如,我们可以使用Spark中的文本文件README.md创建一个RDD textFile,文件中包含了若干文本行,将该文本文件读入RDD textFile时,其中的文本行数据将被分区以便能够分发到集群中并被并行化操作。

根据README.md文件创建RDD textFile

val textFile = sc.textFile("README.md")

行计数

textFile.count() res17: Long = 98

README.md 文件中有98行数据。

得到的结果如下图所示:

然后,我们可以将所有包含Spark关键字的行筛选出来,完成操作后会生成一个新的RDDlinesWithSpark:

创建一个过滤后的RDD linesWithSpark

val linesWithSpark = textFile.filter(line => line.contains("Spark"))

在前一幅图中,我们给出了 textFile RDD的表示,下面的图为RDD linesWithSpark的表示:

值得注意的是,Spark还存在键值对RDD(Pair RDD),这种RDD的数据格式为键/值对数据(key/value paired data)。例如下表中的数据,它表示水果与颜色的对应关系:

对表中的数据使用groupByKey()转换操作将得到下列结果:

groupByKey() 转换操作

pairRDD.groupByKey() Banana [Yellow] Apple [Red, Green] Kiwi [Green] Figs [Black]

该转换操作只将键为Apple,值为Red和Green的数据进行了分组。这些是到目前为止给出的转换操作例子。

当得到一个经过过滤操作后的RDD,可以collect/materialize相应的数据并使其流向应用程序,这是action操作的例子。经过此操作后, RDD中所有数据将消失,但我们仍然可以在RDD的数据上进行某些操作,因为它们仍然在内存当中。

Collect 或 materializelinesWithSpark RDD中的数据

linesWithSpark.collect()

值得一提的是每次进行Spark action操作时,例如count() action操作,Spark将重新启动所有的转换操作,计算将运行到最后一个转换操作,然后count操作返回计算结果,这种运行方式速度会较慢。为解决该问题和提高程序运行速度,可以将RDD的数据缓存到内存当中,这种方式的话,当你反复运行action操作时,能够避免每次计算都从头开始,直接从缓存到内存中的RDD得到相应的结果。

缓存RDDlinesWithSpark

linesWithSpark.cache()

如果你想将RDD linesWithSpark从缓存中清除,可以使用unpersist()方法。

将linesWithSpark从内存中删除

linesWithSpark.unpersist()

如果不手动删除的话,在内存空间紧张的情况下,Spark会采用最近最久未使用(least recently used logic,LRU)调度算法删除缓存在内存中最久的RDD。

下面总结一下Spark从开始到结果的运行过程:

  • 创建某种数据类型的RDD
  • 对RDD中的数据进行转换操作,例如过滤操作
  • 在需要重用的情况下,对转换后或过滤后的RDD进行缓存
  • 在RDD上进行action操作,例如提取数据、计数、存储数据到Cassandra等。

下面给出的是RDD的部分转换操作清单:

  • filter()
  • map()
  • sample()
  • union()
  • groupbykey()
  • sortbykey()
  • combineByKey()
  • subtractByKey()
  • mapValues()
  • Keys()
  • Values()

下面给出的是RDD的部分action操作清单:

  • collect()
  • count()
  • first()
  • countbykey()
  • saveAsTextFile()
  • reduce()
  • take(n)
  • countBykey()
  • collectAsMap()
  • lookup(key)

关于RDD所有的操作清单和描述,可以参考 Spark documentation

结束语

本文介绍了Apache Spark,一个正在快速成长、开源的集群计算系统。我们给大家展示了部分能够进行高级数据分析的Apache Spark库和框架。对 Apache Spark为什么会如此成功的原因进行了简要分析,具体表现为 Apache Spark的强大功能和易用性。给大家演示了 Apache Spark提供的内存、分布式计算环境,并演示了其易用性及易掌握性。

在本系列教程的第二部分,我们对Spark进行更深入的介绍。

相关链接:

原文地址: Introduction to Big Data Analytics w/ Apache Spark Pt. 1

(译者/牛亚真 审校/朱正贵 责编/仲浩)

译者简介:牛亚真,本科,2010年毕业于西南大学计算机与信息科学学院信息管理与信息系统专业;研究生,2013年毕业于中国科学院大学文献情报中心情报学专业,计算机信息处理与检索方向。

原文发布于微信公众号 - CSDN技术头条(CSDN_Tech)

原文发表时间:2015-11-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏CSDN技术头条

Databircks连城:Spark SQL结构化数据分析

数据科学家们早已熟悉的R和Pandas等传统数据分析框架虽然提供了直观易用的API,却局限于单机,无法覆盖分布式大数据场景。在Spark 1.3.0以Spark...

23190
来自专栏安富莱嵌入式技术分享

【二代示波器教程】第12章 示波器设计—DAC信号发生器的实现

本章节为大家讲解二代示波器中信号发生器的实现。这个功能还是比较实用的,方便为二代示波器提供测试信号。实现了正弦波,方波和三角波的频率,幅度以及占空比设置。

11420
来自专栏一英里广度一英寸深度的学习

流式计算

spark是一个大数据分布式的计算框架,有一些并行计算的基础会更容易理解分布式计算框架的概念。对比并行计算,谈三个概念:

1.4K20
来自专栏行者悟空

Hive基本概念

28440
来自专栏华章科技

SparkR:数据科学家的新利器

摘要:R是数据科学家中最流行的编程语言和环境之一,在Spark中加入对R的支持是社区中较受关注的话题。作为增强Spark对数据科学家群体吸引力的最新举措,最近发...

7420
来自专栏cloudskyme

hadoop使用(三)

安装hbase 首先下载hbase的最新稳定版本 http://www.apache.org/dyn/closer.cgi/hbase/ 安装到本地目录中,我安...

42560
来自专栏祝威廉

Spark Streaming 的玫瑰与刺

说人话:其实就是讲Spark Streaming 的好处与坑。好处主要从一些大的方面讲,坑则是从实际场景中遇到的一些小细节描述。

8830
来自专栏Linux驱动

STM32-正弦波可调(50HZ~20KHZ可调、峰峰值0~3.3V可调)

1.原理: 通过定时器每隔一段时间触发一次DAC转换,然后通过DMA发送正玄波码表值给DAC. 当需要改变频率HZ时,只需要修改定时器频率即可(最高只能达到20...

94390
来自专栏Albert陈凯

Spark系列课程-0020Spark RDD图例讲解

我们从这节课开始,讲Spark的内核,英文叫做Spark Core,在讲Spark Core之前我们先讲一个重要的概念,RDD, ? image.png 我们S...

27570
来自专栏null的专栏

hive学习笔记——Hive表的创建

初衷:以前看过Hadoop方面的材料,但是一直以来都是与实际应用脱轨,现在有机会接触到真正的Hadoop集群,还是被他的性能所震撼,利用这个机会认真重新学习下...

53830

扫码关注云+社区

领取腾讯云代金券