Spark-2

上次给大家讲了Spark local模式的启动安装和使用,现在给大家分享一下Standalone模式下的使用和安装。这个讲完以后,还有yarn和mesos下集群的安装和使用。

Spark on local Cluster伪分布式

即Spark Standalone模式。此时Spark会使用Standalone的集群管理器(Cluster Manager)启动Spark。

这种模式,也可以称为Spark的伪分布式。

Standalone集群管理器是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用spark-submit工具提交Job或者在Eclipse、IDEA等开发平台上使用new SparkConf.setManager(“spark://master:7077”)方式运行Spark任务时,Driver是运行在本地Client端上的。

使用$SPARK_HOME/sbin下的start-all.sh可以启动集群,使用stop-all.sh可以停止集群。

我们可以在一台机器上模拟集群,也可以在多台机上上运行Spark Standalone集群。如果是在多台机器上,请保证Master(哪一台调用start-master.sh哪一台就是master)向worker节点的SSH免密码登录。(关于如何实现SSH免密码登录,请查看LInux相关教程)。同时,需要说明的是,如果Worker和master在同一台主机上,也必须要配置SSH向自己的免密码登录。

为了便于学习,我们先在一台机器上启动Spark Standalone模式。

1、单机Standalone步1:下载、解压Spark

请参考之前的步骤。

步2:配置Spark环境变量

可选。本人配置环境变量,一般习惯于创建一个独立的环境变量文件如spark.sh放到/etc/profile.d/目录下。

export SPARK_HOME=/spark/spark

export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

生效:

$ source /etc/profile

步3:配置slaves

在$SPARK_HOME/conf目录下,将slaves.template重命名为slaves。配置只有一个Worker节点。

#配置一个主机名称,或是localhost或是ip地址。建议使用主机名称

hadoop201

步4:启动Spark集群

在$SPARK_HOME/sbin目录下,拥有启动和停止Spark集群的脚本:

start-slave.sh stop-master.sh stop-slaves.sh

start-master.sh start-slaves.sh stop-all.sh start-all.sh stop-slave.sh ....

使用start-all.sh即可以启动spark集群。

[wangjian@hadoop201 sbin]$ start-all.sh

starting org.apache.spark.deploy.master.Master, logging to /spark/spark/logs/spark-wangjian-org.apache.spark.deploy.master.Master-1-hadoop201.out

hadoop201: starting org.apache.spark.deploy.worker.Worker, logging to /spark/spark/logs/spark-wangjian-org.apache.spark.deploy.worker.Worker-1-hadoop201.out

启动以后,通过jps会发现两个进程:Master和Worker。

[wangjian@hadoop201 sbin]$ jps

1206 Worker

1146 Master

1276 Jps

步5:访问MasterUI

在启动过程中,master会将启动过程的日志输出到mater-1-host.out文件中去,现在打开这个文件查看启动日志:

1.[wangjian@hadoop201 sbin]$ cat /spark/spark/logs/spark-wangjian-org.apache.spark.deploy.master.Master-1-hadoop201.out

2.Spark Command: /usr/jdk1.8.0_144/bin/java -cp /spark/spark/conf/:/spark/spark/jars/* -Xmx1g org.apache.spark.deploy.master.Master --host hadoop201 --port 7077 --webui-port 8080

3.========================================

4.Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

5.17/12/08 02:16:06 INFO Master: Started daemon with process name: 1146@hadoop201

6.17/12/08 02:16:06 INFO SignalUtils: Registered signal handler for TERM

7.17/12/08 02:16:06 INFO SignalUtils: Registered signal handler for HUP

8.17/12/08 02:16:06 INFO SignalUtils: Registered signal handler for INT

9.17/12/08 02:16:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

10.17/12/08 02:16:07 INFO SecurityManager: Changing view acls to: wangjian

11.17/12/08 02:16:07 INFO SecurityManager: Changing modify acls to: wangjian

12.17/12/08 02:16:07 INFO SecurityManager: Changing view acls groups to:

13.17/12/08 02:16:07 INFO SecurityManager: Changing modify acls groups to:

14.17/12/08 02:16:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(wangjian); groups with view permissions: Set(); users with modify permissions: Set(wangjian); groups with modify permissions: Set()

15.17/12/08 02:16:08 INFO Utils: Successfully started service 'sparkMaster' on port 7077.

16.17/12/08 02:16:09 INFO Master: Starting Spark master at spark://hadoop201:7077

17.17/12/08 02:16:09 INFO Master: Running Spark version 2.1.2

18.17/12/08 02:16:10 INFO Utils: Successfully started service 'MasterUI' on port 8080.

19.17/12/08 02:16:10 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://192.168.56.201:8080

20.17/12/08 02:16:10 INFO Utils: Successfully started service on port 6066.

21.17/12/08 02:16:10 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066

22.17/12/08 02:16:11 INFO Master: I have been elected leader! New state: ALIVE

23.17/12/08 02:16:13 INFO Master: Registering worker 192.168.56.201:43874 with 1 cores, 1024.0 MB RAM

通过上面的日志,分析得到以下信息:

1:行2,可以知启动Spart就是通过java命令,启动一个Java类,这个类叫Master,且指定了最大使用的内存。及端口号。

2:行16,可知spark的端口号为7077。

3:行18,可知WebUI的端口号为8080.

4:行20,可知启动一个REST的service在6066端口。

5:行23,可知worker所使用的端口及服务器地址。

有兴趣的朋友,可以再去查看worker的日志文件,从中你会知道worker节点的端口为8081。

现在访问8080的WebUI:

步6:开启一个RDD

开启一个RDD会启动所有Worker上的Executor即:CoarseGrainedExecutorBackend。

现在我们可以执行一个spark-shell连接这个集群。通过输出以下的命令:

$ spark-shell - -master spark://hadoop201:7077

[wangjian@hadoop201 spark]$ spark-shell --master spark://hadoop201:7077

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

17/12/08 02:36:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Spark context Web UI available at http://192.168.56.201:4040

Spark context available as 'sc' (master = spark://hadoop201:7077, app id = app-20171208023624-0000).

Spark session available as 'spark'.

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 2.1.2

/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)

Type in expressions to have them evaluated.

Type :help for more information.

启动以后,检查8080 WebUI你会了发现多了一个Application:

使用jps检查进程:

[wangjian@hadoop201 ~]$ jps

1472 SparkSubmit会在master节点上,启动一个SparkSubmit用于处理分配任务

1555 CoarseGrainedExecutorBackend #启动所有worker节点上的executor后台进程

1587 Jps

1206 Worker

1146 Master

再次加载一个本地或是hdfs上的文件,进行行统计,你会发现有一个计算的过程如下:

由于是集群运算,所以,会显示一个进度。

进行单词统计的示例:

scala> var rdd2 = sc.textFile("file:///spark/a.txt");

rdd2: org.apache.spark.rdd.RDD[String] = file:///spark/a.txt MapPartitionsRDD[6] at textFile at :24

scala> rdd2.flatMap(str=>str.split("\\s+")).map(word=>(word,1)).reduceByKey(_+_).collect().foreach(println);

(Hello,3)

(Alex,1)

(Mary,1)

(Jack,1)

或是使用以下表达式:

scala> rdd2.flatMap(str=>str.split("\\s+")).map(word=>(word,1)).reduceByKey(_+_).collect().foreach(kv=>(println(kv._1+","+kv._2)));

Hello,3

Alex,1

Mary,1

Jack,1

如果太长的话,回回车也是可以的:四车之前先输入.(点)

scala> rdd2.flatMap(str=>str.split("\\s+")).

map(word=>(word,1)).

reduceByKey(_+_).

collect().

foreach(kv=>(println(kv._1+" "+kv._2)));

Hello 3

Alex 1

Mary 1

Jack 1

步7:代码连接集群

通过Java或是通过Scala代码连接集群,只需要设置master为spart://ip:7077即可。

建议使用spark-submit方式来执行,在foreach中输出的数据会输出到stdout中。

代码:

packagecn.wang

importorg.apache.spark.rdd.RDD

importorg.apache.spark.

/**

*使用submit方式提交到集群

*读取完成文件以后,保存到stdout中

*/

objectSpark05_Standalone {

defmain(args: Array[String]): Unit = {

if(args.length ==) {

println("请输入参数");

return;

}

valpath = args.apply();

//声明config

valconf: SparkConf =newSparkConf();

conf.setAppName("Standalone");

varsc: SparkContext =newSparkContext(conf);

valtextFile: RDD[String] = sc.textFile(path);

vallineCount: Long = textFile.count();

println("行数据:"+ lineCount);

//单词统计

textFile.flatMap(_.split("\\s+"))//

.map((_,1)).reduceByKey(_ + _)

.collect()

.foreach(kv => {

println("结果:"+ kv._1 +","+ kv._2);

});

sc.stop();

}

}

开发一个Shell脚本文件,且添加执行权限,假设文件名称为:submit.sh

#!/bin/bash

if [ $# -eq 0 ]; then

echo "请输入读取的文件"

else

spark-submit \

--class cn.wang.Spark05_Standalone \

--master spark://192.168.56.201:7077 \

SparkDemo01.jar $1

fi

现在,就可以通过向shell脚本传递不同的文件方式,来执行这个程序:

如:

读取本地文件请执行:

$ submit.shfile:///spark/a.txt

读取hdfs上的文件:

$ submit.sh hdfs://hadoop201:8020/wangjian/a.txt

上面的脚本执行以下,都可以输出正确的结果:

结果:Hello,3

结果:Alex,1

结果:Mary,1

结果:Jack,1

当然也可以将结果,保存到HDFS上。

2、集群Standalone模式

Spark集群非常简单,只要修改$SPARK_HOME/conf目录下的slaves文件即可。建议在所有节点相同的目录下,安装Spark,同时配置环境变量。

步1:修改slaves文件

将spark_home目录下的的slavles.template文件,重命名为:slaves。并添加worker节点:

#this is master node of Spark

hadoop101

#this is slave Node of Spark

hadoop102

步2:配置ssh免密码登录

哪台机器上执行start-all.sh/start-master.sh即哪一台为master主机,将拥有master节。所有配置到slaves中的节点,都是worker节点。所以,需要配置从master到worker的免密码登录。

在master节点上执行:

$ ssh-keygen -t rsa

$ ssh-copy-id hadoop102

步3:scp拷贝文件

使用scp -r将文件拷贝到其他节点。

步4:使用start-all.sh启动集群

$ ./spark/sbin/start-all.sh

[wangjian@hadoop101 spark]$ jps

1204 Jps

1157 Worker

1086 Master

检查所在节点上的Worker是否已经启动。

步5:查看WebUI界面

通过http://yourMasterIp:8080查看Spark:

步6:开启一个Driver

每开启一个Driver在集群的环境下,所有的worker节点上的Executor都会启动,同时会在Master节点 上,开启SparkSubmit进程,如:

$ spart-shell --master spark://192.168.56.101:7077

然后检查Master节点上的进程为:

[wangjian@hadoop101 ~]$ jps

1157 Worker #因为当前节点也是worker所以拥有一个Worker进程

1382 Jps

1243 SparkSubmit #主节点,即Master会拥有一个SparkSubmit进程

1086 Master

1311 CoarseGrainedExecutorBackend #当启动一个Driver时,每一个Worker节点启动的进程

检查其他Worker节点

[wangjian@hadoop102 spark]$ jps

1104 Worker #worker节点的进程

1216 Jps

1165 CoarseGrainedExecutorBackend #当启动一个Driver每一个Worker节点启动的进程

步7:提交任务

注意,由于目前已经是在集群的环境下,所以,如果要读取本地文件,应该保证在所有节点的相同目录下,都拥有此文件。在这种情况下,读取hdfs中的文件,就变得比较方便。

现在我们启动hadoop集群,来测试spark:

1:使用Spark-Shell进行测试

scala> val textFile = sc.textFile("hdfs://192.168.56.101:8020/wang/a.txt");

textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.56.101:8020/wang/a.txt MapPartitionsRDD[1] at textFile at :24

scala> textFile.count();

res0: Long = 3

scala> textFile.flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_).

collect().foreach(println);

(Hello,3)

(Mary,1)

(Rose,1)

(Jack,1)

设置一个输出的格式:

scala> textFile.flatMap(_.split("\\s+")).map((_,1)).

reduceByKey(_+_).

collect().

map(kv=>{ //这儿用于设置输出的格式

val key=kv._1;

val v = kv._2;

key+"\t"+v;

}).foreach(println);

Hello 3

Mary 1

Rose 1

Jack 1

如果在读取文件中,没有输入hdfs://前缀,则默认也是读取hdfs文件系统中的数据,但这一点取决于您已经配置了HADOOP_CONF_DIR在$SPARK_HOME/conf/spark-env.sh文件中,如下:

#配置指定hadoop的配置目录,以便于让Spark使用yarn

HADOOP_CONF_DIR=/hadoop/hadoop-2.7.3/etc/hadoop

以下就可以省去hdfs://前缀了:

scala> val tf = sc.textFile("/wang/a.txt");

tf: org.apache.spark.rdd.RDD[String] = /wang/a.txt MapPartitionsRDD[1] at textFile at :24

scala> tf.count();

res0: Long = 3

2:在spark-shell中操作hdfs

可以读取hdfs上的文件,也可以直接将数据保存到hdfs上:

scala> val tf = sc.textFile("/wang/a.txt"); //读取hdfs上的文件

scala> tf.count();

res0: Long = 3

scala> tf.flatMap(_.split("\\s+")).saveAsTextFile("/out002"); //保存到hdfs上

scala> tf.flatMap(_.split("\\s+")). //保存到hdfs上

map((_,1)).reduceByKey(_+_).saveAsTextFile("/out003");

scala> tf.flatMap(_.split("\\s+")). //格式化以后保存到hdfs上

map((_,1)).reduceByKey(_+_).

val str=kv._1+"\t"+kv._2;

str;

}).saveAsTextFile("/out004");

上面种方式保存到hdfs上的格式依次是:

$ hdfs dfs -cat /out002/* #没有进行统计,直接保存

Hello

Jack

Hello

Mary

Hello

Rose

$ hdfs dfs -cat /out003/* #进行统计以后保存,带有括号

(Hello,3)

(Mary,1)

(Rose,1)

(Jack,1)

$ hdfs dfs -cat /out004/* #格式化以后保存 去除的括号

Hello 3

Mary 1

Rose 1

Jack 1

3:通过spark-submit提交任务

开发的Scala代码,还和以前一样。不过,为了不让大家到处乱找,我还是给出完整的代码:

packagecn.wang

importorg.apache.spark.rdd.RDD

importorg.apache.spark.

/**

*操作Spark Standalone Cluster模式下的提交

*/

objectSpark06_Cluster {

defmain(args: Array[String]): Unit = {

if(args.length

println("请输入参数1:hdfs file,2:输出hdfs目录")

return;

}

valconf: SparkConf =newSparkConf();

conf.setAppName("Spark_Standalone_Cluster");

valsc: SparkContext =newSparkContext(conf);

//读取hdfs文件

valtf: RDD[String] = sc.textFile(args.apply());

valcount: Long = tf.count();

println("文件中的行数为:"+ count);

valdata = tf.flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_ + _);

//输出到控制台-将会输出到stdout文件日志文件中去

data.map(kv => {

valstr = kv._1 +"\t"+ kv._2;

str;

}).foreach(println);

//输出到hdfs

data.map(kv => {

valstr = kv._1 +""+ kv._2;

str;

}).saveAsTextFile(args.apply(1));

println("执行完毕")

sc.stop();

}

}

开发一个脚本(可选)并输入以下命令:

[wangjian@hadoop101 sparkjar]$ cat submit01.sh

#!/bin/bash

if [ $# -lt 2 ]; then

echo "参数1 hdfs地址,参数2输出的hdfs地址"

else

spark-submit \

--class cn.wang.Spark06_Cluster \

--master spark://192.168.56.101:7077 \

SparkDemo01.jar $1 $2

fi

启动这个脚本文件,检查hdfs文件系统上的输出:

$ hdfs dfs -cat /out001/*

Hello3

Mary1

Rose1

Jack1

1、小结

1:只要使用spark的start-all.sh启动的standalone集群,无论多少个worker操作方式都一样。

2:standalone模式下master的地址为:spark://ip:7077。

3:在开中,大量使用spart-submit方式提交,以便于真实环境的测试。

4:无论是何种模式,都可以操作hdfs上的文件,只要能访问到,都可以通过saveAsTextFile的方式保存数据到hdfs上。

5:在多个worker即cluster模式下,多个worker输出的数据不会在控制台出现,而是会出现在stdout的日志文件中。

本文来自企鹅号 - 全球大搜罗媒体

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏岑玉海

Spark源码系列(一)spark-submit提交作业过程

前言 折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程。 ? 这个是Spark的App运行图,它通过一个Driver来和集...

52760
来自专栏AILearning

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Even...

2.2K60
来自专栏岑玉海

Hadoop源码系列(一)FairScheduler申请和分配container的过程

1、如何申请资源 1.1 如何启动AM并申请资源 1.1.1 如何启动AM val yarnClient = YarnClient.createYarnClie...

46840
来自专栏个人分享

Spark Job的提交与task本地化分析(源码阅读八)

  我们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程)、NODE_LOCAL...

13020
来自专栏芋道源码1024

分布式作业 Elastic-Job-Lite 源码分析 —— 作业数据存储

JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀。

13420
来自专栏祝威廉

Spark Shuffle Write阶段磁盘文件分析

上篇写了 Spark Shuffle 内存分析 后,有不少人提出了疑问,大家也对如何落文件挺感兴趣的,所以这篇文章会详细介绍,Sort Based Shuff...

8410
来自专栏芋道源码1024

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业数据存储

JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀。

9420
来自专栏公有云大数据平台弹性 MapReduce

腾讯云EMR使用说明: 配置工作流

本文将通过一个简单,并且具有典型代表的例子,描述如何使用EMR产品中的Hue组件创建工作流,并使该工作流每天定时执行。

10.2K130
来自专栏Python爬虫与算法进阶

Spark教程(二)Spark连接MongoDB

数据可能有各种格式,虽然常见的是HDFS,但是因为在Python爬虫中数据库用的比较多的是MongoDB,所以这里会重点说说如何用spark导入MongoDB中...

94620
来自专栏听雨堂

Log4Net使用心得

winform程序使用Log4net   1.引用dll   2.添加log4net.config,设置“始终复制”   4.assemblyinfo.cs中添...

245100

扫码关注云+社区

领取腾讯云代金券