本文是对初始接触 Spark 开发的入门介绍,说明如何搭建一个比较完整的 Spark 开发环境,如何开始应用相关工具,基于如下场景:
其中前两项属于 Spark 计算环境搭建,后两项属于 Scala 编程。文中如有错误或者不当之处,敬请指正。
分布式计算有两个基础性问题:计算的并行调度与数据的分布存储,我们使用 Spark 来解决计算并行调度的问题,使用 Hadoop HDFS 解决分布式存储的问题。简述下原因:
首先搭建 Hadoop HDFS, HDFS 是 Hadoop 项目中的一个组件,本文中说明的部署方式仅为在单机上搭建完整开发环境(只包含一个 NameNode 和一个 DataNode,无 HA 支持),生产环境会复杂一些,请参阅 HDFS 官方文档。搭建过程如下:
先确认已安装 JDK(JRE 以能保证程序运行需要,但开发环境还是需要安装 JDK),如果没有,请从 oracle 站点下载安装商业版本,不要使用公司主机 yum install jdk 安装的 OpenJDK。如果是直接解压 tar 包安装,安装后请设置环境变量:
# 1. add below line to ~/.bashrc
export JAVA_HOME=/data/spark/java/
export PATH=$PATH:$JAVA_HOME/bin
# 2. 生效配置
$ source ~/.bashrc
本例中,使用 "spark" 用户进行操作,spark 用户目录为 /data/spark。
~/hadoop/ | ------ bin (工具程序目录)
------ etc/hadoop (配置文件目录)
------ sbin (服务程序目录,主要为服务程序启停脚本)
------ ... (其它暂不关心)
如果我们只需使用 HDFS,有如下几个配置配置文件需要关注:
1、hadoop-env.sh: 配置 hadoop 进程运行时的相关环境变量,对于搭建开发环境,只需要设置一个配置项:
export JAVA_HOME=/data/spark/java
在准备工作中,我们已经将其加入到 shell 环境变量中了,但在运行 shell 脚本时,这个环境变量并不能带给脚本程序。
2、core-site.xml: 配置 hadoop 服务公共配置项,目前也只需要配置一项:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://127.0.0.1:10000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/data/spark/hadoop-data</value>
</property>
</configuration>
fs.defaultFS 和 hadoop.tmp.dir 均是全局的,影响 hdfs 服务、管理工具以及其它 hadoop 组件。通过这两个参数,可以配置 HDFS 对外服务地址以及数据的存储路径,存在如下推导关系:
* fs.defaultFS -> dfs.namenode.rpc-address (hdfs-site.xml)
* hadoop.tmp.dir -> dfs.name.dir (hdfs-site.xml, ${hadoop.tmp.dir}/dfs/name)
* hadoop.tmp.dir -> dfs.data.dir (hdfs-site.xml, ${hadoop.tmp.dir}/dfs/data)
3、hdfs-site.xml: 保存 HDFS 专有配置
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
开发环境只启动了一个 DataNode 实例,因此每个数据块只能有一个副本。
4、slaves: 配置 datanode 主机列表, hadoop 安装包已经将 "localhost" 节点加入了。
5、log4j.properties: log4j 日志配置,开发环境可以将日志设置为 DEBUG 级别。
hadoop.root.logger=DEBUG,console
配置 OK 了,很兴奋,就可以将服务跑起来了。且慢,还需要做两个准备操作。
1、HDFS NameNode 初始化
$ bin/hdfs namenode -format
初始化过程中会自动创建所需要的目录。初始化完成后,已经可以启动 HDFS 服务了,但为了操作更方便些,还需要处理下面步骤。
2、SSH 公钥免密登录授权
hdfs 是一个集群服务,我们可以在 NameNode 节点上操作所有的 slave 节点(DataNode),hadoop 是通过封装 ssh 远程 shell 实现的 (sbin/slaves.sh 内通过 ssh 远程起停 slave 节点上的服务)。虽然我们的开发集群只是一个单机节点,但任然需要开通本机(DataNode) 对本地(NameNode) 的 ssh 免密登录,方便集群管理,具体设置方式这里不再赘述,如不了解,可自行搜索相关资料。
$ sbin/start-dfs.sh
启动集群只需上面一行命令,如果没有什么端口冲突,应该是一切顺利了。万一有端口冲突,也没关系, 这里可以查询所有 HDFS 服务端口配置项,结合日志,更改下冲突项目就行。
现在,我们可以看到运行了如下 hdfs 相关服务进程:
$ jps
11939 SecondaryNameNode
11591 NameNode
11743 DataNode
除了 NameNode, DataNode 外,另外还多出一个 SecondaryNameNode 进程,这个名字容易让人误解,它并非是 NameNode 的备份,而是为了更可靠维护 HDFS 元数据信息而提供的服务实例,定期将修改合并到元数据存储文件,目前我们可以忽略它。
类似的,停止集群也是一行命令:
$ sbin/stop-dfs.sh
$ bin/hdfs dfs -mkdir /input
$ bin/hdfs dfs -put README.txt /input/
$ bin/hdfs dfs -ls /input
Found 1 items
-rw-r--r-- 1 spark supergroup 1366 2017-05-19 16:37 /input/README.txt
$ bin/hdfs dfs -cat /input/README.txt
For the latest information about Hadoop, please visit our website at:
... ...
通过上面列出的操作,我们在 hdfs 建立了目录 "/input", 并将本地文件系统的 "README.txt" 文件上传到了 HDFS(如果集群中存在多个 DataNode, 则文件数据将会分布在多个主机上)。 bin/hdfs 工具的使用方式与 shell 类似,其帮助信息有对用法的详细说明,这里不再赘述。
上面通过 bin/hdfs 工具的方式访问 HDFS 有两个弊端:
对于上述问题,hadoop 已提供了解决方案。hadoop 提供了 HDFS NFS Gateway, 可以将 HDFS 以 NFS 方式挂接到本地文件系统中,以支持常规 sell 命令的访问,由于 NFS Gateway 服务是常驻服务,也就避免了反复启动 Java 虚拟机,大大提升了临时操作的效率。下面简述下设置过程,更多的信息可以参考 官方文档。当然,如果您想快点开始 spark 编程,也可以略过此节。
基础功能只需在 etc/hadoop/core-site.xml 加入:
<property>
<name>hadoop.proxyuser.$user-nfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.$user-nfs.hosts</name>
<value>*</value>
</property>
这里配置 nfs 用户授权,需要注意的是 $user-nfs 需要替换为当下启动 nfs gateway 的 unix 用户名(本例中为 spark),上面配置表示允许运行在所有主机上的、以$user-nfs 用户启动的网关访问任意 hadoop 用户组下的文件(HDFS 文件)。
# 1. restart hdfs (core-site.xml changed)
$ sbin/stop-dfs.hs
$ sbin/start-dfs.hs
# 2. stop current nfs service (if exist)
$ sudo service nfs stop
$ sudo rpcbind stop
# 3. start hadoop port map (must be in root)
$ sudo sbin/hadoop-daemon.sh --script bin/hdfs start portmap
# 4. start nfs proxy server (in hadoop user)
$ sbin/hadoop-daemon.sh --script bin/hdfs start nfs3
# 5. mount
$ mkdir /mnt/hdfs
$ sudo mount -t nfs -o vers=3,proto=tcp,nolock,noacl 127.0.0.1:/ /mnt/hdfs/
# 6. test
$ ls /mnt/hdfs
Total 99
-rw-r--r-- 1 spark 2584148964 99253 5 月 19 17:29 LICENSE.txt
-rw-r--r-- 1 spark 2584148964 1366 5 月 19 16:37 README.txt
# 7. stop nfs server
$ sudo sbin/hadoop-daemon.sh --script bin/hdfs stop portmap
$ sbin/hadoop-daemon.sh --script bin/hdfs stop nfs3
到这里,我们已经搭建好了一个最简化的 HDFS 集群,可以支持进行开发测试,下面介绍 spark 的搭建与编程。
部署一个单机环境的 spark 服务很简便,这里简单介绍下,更多关于 spark 的部署介绍可以查看官网:https://spark.apache.org.
首先在官网下载最新稳定版本,目前是 2.1,解压到目标目录即完成安装,本文中安装目录为 /data/spark/spark. spark 解压后主要包含如下子目录:
/data/spark/spark
-----------------
|
bin/ (工具程序目录)
conf/ (配置文件目录)
jars/ (scala Jar 包目录)
python/ (python package 目录)
sbin/ (服务程序管理脚本目录)
不做任何配置,此时已可以启动 Spark 服务:
$ sbin/start-all.sh
$ jps
29584 Master
29670 Worker
如果没有端口冲突,一般都能启动成功。本例中这种运行模式 spark 称之为 Standalone(独立模式,不依赖其它服务构成集群),这种模式一般包括一个 Master 实例和多个 Worker 实例,能以最简单的方式建立起一个集群,方便开发和构建小规模集群。Spark 还支持 Local 和基于通用资源管理器(mesos, YARN) 的集群两种运行模式,分别适用于开发调试与大规模集群部署两种场景。关于运行模式的更详细说明参见官网。
虽然可以零配置启动服务,但为了开发时对系统有更多控制,简单说明下开发中可能会修改的几个基础配置。
1、日志级别: conf/log4j.properties
spark 预装了配置模板: conf/log4j.properties.template, 将其拷贝为 conf/log4j.properties,即可修改日志配置。
# 日志设置为 debug 级别
log4j.rootCategory=DEBUG, console
2、系统配置:conf/spark-defaults.conf
该文件为系统主要配置文件,服务和工具程序都可能会使用到,在初步使用时,可能会配置到如下参数:
# spark master 服务绑定地址
spark.master spark://127.0.0.1:7077
# 配置执行器占用内存(默认 1g),executor 存在于 Worker 进程中
# 内存总量/spark.executor.memory 为系统最大并行存在执行器数目。
# 开发时可能修改改值,以获得适当的执行器数目
spark.executor.memory 512m
交互式 spark 编程环境,使用 scala 语言。spark-shell 启动时,会导入相关依赖库,并创建名称为 "sc" 的 SparkContext 对象,这个对象是通向 spark 世界的向导,我们已经可以在交互环境开始第一次 Spark 分布式计算之旅了。
$ bin/spark-shell
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext[@1e38e8b6][3]
scala> val rdd1 = sc.textFile("file:///data/spark/spark/README.md")
rdd: org.apache.spark.rdd.RDD[String] = file:///data/spark/spark/README.md MapPartitionsRDD[13] at textFile at <console>:24
scala> val rdd2 = rdd1.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ _)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:26
scala> rdd2.collect()
res2: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), ...
scala> rdd2.count()
res3: Long = 289
scala> :quit
$
Spark 2.0 后提供了新的切入点 SparkSession 类, 在 Shell 启动时会创建名称为 "spark" SparkSession 对象,sc = spark.sparkSession,关于 SparkSession, SparkContext, RDD 等 Spark 编程核心概念这里不做展开,在网络上很容易获得相关介绍资料。
spark-shell 中输入 ":quit" 可以退出 shell, 输入":help" 可以获取帮助。
上面例子中,对本地的 README.md 文件使用 spark 做了单词计数。如果 README.md 规模巨大,难以在单台服务器对其进行单词计数,我们只需增加服务器,将 HDFS 和 Spark 扩展为一个多服务器集群,先将数据导入的 HDFS,就可执行分布式并行计算了。对于复杂的数据与计算的分布管理,则交给 HDFS 和 spark 去处理,我们在编程上,与本地计算代码几乎没有区别。下面是分布式集群环境计算的代码:
$ $HADOOP_ROOT/bin/hdfs dfs -put very-large-file-path /input/tmp.dat
$ bin/spark-shell
scala> val1 rdd1 = sc.textFile("hdfs://hdfs-namenode-addr/input/tmp.dat")
scala> val2 rdd2 = rdd1.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ _)
scala> rdd2.saveAsTextFile("hdfs://hdfs-namenode-addr/output")
hdfs-namenode-addr: 需要替换为多主机集群环境下,实际 HDFS NameNode 服务访问地址。
功能与 bin/spark-shell 相同,提供支持 Python 交互式编程环境。我们可以通过设置环境变量 "PYSPARK_PYTHON" 启用习惯的 python shell,譬如 ipython。
$ export PYSPARK_PYTHON=ipython
$ bin/pyspark
... ...
Using Python version 2.7.5 (default, Nov 20 2015 02:00:19)
SparkSession available as 'spark'.
In [1]: sc
Out[1]: <pyspark.context.SparkContext at 0x7f18db589590>
同样的,python shell 也会在启动时预建名称为 "sc" SparkContext 对象,作为调用 Spark 集群功能入口。
spark 在 bin 目录下还提供了其它一些核心工具,这里简单列举下,进入到 spark 的世界后,自然也会掌握它们的用法。
* bin/spark-submit: 提交 Job 到 spark 执行
* bin/spark-sql: Sql 交互查询工具,spark 支持以 SQL 语句描述数据处理过程
* bin/sparkR: R 语言交互编程环境
本节中,我们搭建了一个最简单的单机独立模式集群,并通过 spark 提供的交互编程环境执行了单词计数的任务,感受到了 spark 对分布式计算优雅简洁的描述。Spark 自身主要采用 Scala 进行开发,提供 Scala, Java, Python, R 等语言编程接口。一般而言,使用与系统实现语言相同的 scala 语言进行应用开发,在保障最大化运行时性能的同时(Scala, Java 程序会被编译直接在 JVM 上运行的代码,Python, R 程序运行时存在虚拟机之间的交互),也能获得很好的开发效率,另外,掌握 scala 编程,也有助于对 spark 进行更深入的学习理解。下一节简单介绍下 scala 编程环境的搭建。
如果要开发正式的应用,一个好用的构建工具是必须的,不然光是管理 jar 包繁琐依赖就会耗费大量时间,另外,各个版本的 scala 运行时库可能不兼容,支持多目标版本编译也需要专业工具支持才行。
所谓搭建 scala 开发环境,也就是选出这个工具,并安装配置好。scala 开发可选则的构建工具主要有 sbt, maven, gradle 这三个。我这里选择 sbt,原因是这三者虽然功能上难分伯仲,但 sbt 与 scala 具备天然的亲和性,它自身是使用 scala 编写的,其工程定义文件实际也是一个 scala 程序,使用它构建 scala 项目更加简洁纯粹。
sbt 官网: http://www.scala-sbt.org, 在这上面有有很详细的 中文文档。
sbt 从官网下载最新版本,开箱即可使用,其安装说名这里不再赘述。sbt 解压后的主要内容如下:
sbt
-----
|
--- bin/ # 执行工具路径
--- conf/ # 配置目录, sbt 全局工作选项以及 sbt 启动 java vm 参数
--- lib/ # 预装 jar 包
将上面在交互模式下运行的单词计数使用独立的 scala 程序实现。
1、首先创建 sbt 工程, 建立如下结构的目录与文件:
wordcount [project root directory]
----------
|
----- build.sbt [sbt 工程定义文件]
----- project/ [sbt 选项与编译扩展插件目录,当前留空]
----- src/main/scala/WordCount.scala [ 源代码]
关于更多 sbt 工程目录结构信息,可以查看官网文档。
2、配置 build.sbt
lazy val root = (project in file("."))
.settings(
name := "wordcount",
version := "1.0",
scalaVersion := "2.11.8"
)
上面语句实际就是一行 scala 代码, (project in file(".")) 语句生成了一个 sbt 工程对象,之后调用其 settings() 函数,设置工程属性。使用程序语言定义工程会非常简洁灵活,具备非常好的可扩展性。
重要: scalaVersion 必须与当前 spark 使用的 scala 版本一致,否则生成的 jar 包不一定能在 spark 环境中运行,这个版本可以通过查看 $spark_root/jars/scala-library-$version.jar 文件名称获取到。
3、编写 WordCount.scala
import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("WordCount")
.getOrCreate()
val sc = spark.sparkContext
sc.textFile("hdfs://127.0.0.1:10000/input/README.txt")
.flatMap(line => line.split(" "))
.map(x => (x, 1))
.reduceByKey(_ _)
.saveAsTextFile("hdfs://127.0.0.1:10000/output")
spark.stop()
}
}
4、关联 spark 本地 jar 包依赖
sbt 工程依赖分为托管依赖(managed dependency) 与非托管依赖(unmanaged dependency)。托管依赖指在远程组件仓库(maven, ivy 等)管理的依赖包,工程中定义声明下使用的版本,编译时直接从远程下载。非托管依赖只存在于本地的依赖包,默认为工程根目录下 "lib" 子目录。wordcount 工程依赖 spark 的 jar 包,已存在于 spark 安装目录下,因此直接在工程目录下建立如下软连接是最便捷的完成依赖包设定的方式:
ln -s /data/spark/spark/jars lib
sbt 会首先从本地库中寻找寻找被引用组件包。
5、编译与打包
sbt package
执行上述命令,完成编译打包,生成 jar 文件,到这里,第一个独立打包的 spark app 已孵出了。
6、提交运行
终于可以 run 了~~~~
# 之前已经通过 nfs 将 hdfs 挂载到本地文件系统中,先删除 output 目录,避免程序结束时保存结果冲突
$ rm -rf /mnt/hdfs/output
# 提交 App
$ spark-submit --class WordCount target/scala-2.11/wordcount_2.11-1.0.jar
# 查看结果
$ ls /mnt/hdfs/output
part-00000 part-00001 _SUCCESS
$ head /mnt/hdfs/output/part-00000
(under,1)
(this,3)
(distribution,2)
(Technology,1)
... ...
到这里,我们已经走完了从开发环境搭建到应用工程建立与测试的历程,在 Spark/Scala 之海的浅滩处小游了一下:
但毕竟还是在浅滩,要真实使用 spark 解决比较大规模的计算任务,我们还要持续向 Spark/scala 之海的深水区探索:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。