前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark的运行环境及远程开发环境的搭建

Spark的运行环境及远程开发环境的搭建

作者头像
ZONGLYN
发布2019-08-08 10:15:43
2.2K0
发布2019-08-08 10:15:43
举报
文章被收录于专栏:程序萌部落

基本概括

概述

spark快速

  • 扩充了mapreduce
  • 基于内存计算(中间结果的存储位置)

spark通用

  • 批处理hadoop
  • 迭代计算 机器学习系统
  • 交互式查询 hive
  • 流处理 storm

spark开放

  • Python API
  • Java/Scala API
  • SQL API
  • 整合好hadoop/kafka

主要内容

  • 环境搭建
  • 核心概念RDD
  • 架构
  • 重要组件SparkStreaming

发展历史

2009 RAD实验室,引入内存存储 2010 开源 2011 AMP实验室,Spark Streaming 2013 Apache顶级项目

主要组件

Spark Core:

  • 包括spark的基本功能,任务调度、内存管理、容错机制
  • 内部定义RDDs(弹性分布式数据集)
  • 提供APIs来创建和操作RDDs
  • 为其他组件提供底层服务

Spark SQL:

  • 处理结构化数据的库,类似于HiveSQL、Mysql
  • 用于报表统计等

Spark Streaming:

  • 实时数据流处理组件,类似Storm
  • 提供API来操作实时数据流
  • 使用场景是从Kafka等消息队列中接收数据实时统计

Spark Mlib:

  • 包含通用机器学习功能的包,Machine Learning Lib
  • 包含分类、聚类、回归、模型评估、数据导入等
  • Mlib所有算法均支持集群的横向扩展(区别于python的单机)

GraphX:

  • 处理图数据的库,并行的进行图的计算
  • 类似其他组件,都继承了RDD API
  • 提供各种图操作和常用的图算法,PageRank等

Spark Cluster Managers:

  • 集群管理,Spark自带一个集群管理调度器
  • 其他类似的有Hadoop YARN,Apache Mesos

紧密集成的优点

  • Spark底层优化后,基于底层的组件也会相应优化
  • 减少组件集成的部署测试
  • 增加新组建时其他组件可以方便使用其功能

hadoop应用场景

  • 离线处理、对时效性要求不高、要落到硬盘上

spark应用场景

  • 时效性要求高、机器学习、迭代计算

Doug Cutting的观点

生态系统、各司其职 Spark需要借助HDFS进行持久化存储

运行环境搭建

基础环境

  • Spark - scala - JVM - Java7+
  • Python - Python2.6+/3.4+
  • Spark1.6.2 - Scala2.10/Spark2.0.0 - Scala2.11
  • 搭建Spark不需要Hadoop,如果存在则需要下载相关版本(不是上述对应关系)

具体步骤

详见http://dblab.xmu.edu.cn/blog/spark-quick-start-guide/ 主要是两个步骤:

  1. 安装Hadoop(不做介绍)
  2. 解压Spark到对应位置,然后在spark-env.sh中添加SPARK_DIST_CLASSPATH
  3. run-example SparkPi已可以正常运行示例 注意几点:
  • Spark版本要严格对照Hadoop版本
  • Spark运行不依赖Hadoop启动
  • Spark运行目录bin的内容,要确保有执行权限[+x]

Spark目录

  • bin 包含和Spark交互的可执行文件,如Spark shell
  • core,Streaming,python等 包含主要组件的源代码
  • examples 包含一些单机的Spark job

Spark shell

  • Spark的shell能够处理分布在集群上的数据
  • Spark把数据加载到节点的内存中,故分布式处理可以秒级完成
  • 快速迭代计算,实时查询,分析等都可以在shell中完成
  • 有Scala shell和Python shell

Scala shell:/bin/scala-shell

注意:

  • 启动日志级别可以修改为WARN,在目录/conf/log4j.properties
  • 开启Spark-shell要先启动hadoop,否则会出现以下错误12345678910111213141516171819202122232425262728[hadoop@hadoop01 bin]$ ./spark-shell ... ...Caused by: java.net.ConnectException: Call From hadoop01/192.168.146.130 to hadoop01:9000 failed on connection exception: java.net.ConnectException: 拒绝连接;For more details see: http://wiki.apache.org/hadoop/ConnectionRefused; ... 104 more<console>:14: error: not found: value spark import spark.implicits._ ^<console>:14: error: not found: value spark import spark.sql ^Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)Type in expressions to have them evaluated.Type :help for more information.scala> scala> val lines = sc.textFile("/home/hadoop/look.sh")<console>:17: error: not found: value sc val lines = sc.textFile("/home/hadoop/look.sh") ^

其他可能出现的错误:

代码语言:javascript
复制
[hadoop@hadoop01 bin]$ ./spark-shell
  ... ...
Caused by: java.net.ConnectException: Call From hadoop01/192.168.146.130 to hadoop01:9000 failed on connection exception: java.net.ConnectException: 拒绝连接;For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused;
  ... 104 more
<console>:14: error: not found: value spark
	   import spark.implicits._
			  ^
<console>:14: error: not found: value spark
	   import spark.sql
			  ^
Welcome to
	  ____              __
	 / __/__  ___ _____/ /__
	_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
	  /_/
		 
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala> 
scala> val lines = sc.textFile("/home/hadoop/look.sh")
<console>:17: error: not found: value sc
	   val lines = sc.textFile("/home/hadoop/look.sh")
				   ^
  • 上述错误出现的原因是/tmp/hive这里,本质上是hdfs中此目录的读写权限出了问题(Spark的运行并不需要Hive的开启,甚至没有Hive也可以),此处只是/tmp/hive这个目录出了问题,使用hadoop dfs -chmod 777 /tmp/hive来修改其权限,如果出现 Name node is in safe mode,那么则需要使用hadoop dfsadmin -safemode leave来退出安全模式,之后便可以正常修改权限,改完之后再执行spark-shell变会出现正常的初始化结果:
代码语言:javascript
复制
17/07/02 13:27:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/02 13:27:54 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.146.130:4040
Spark context available as 'sc' (master = local[*], app id = local-1498973265138).
Spark session available as 'spark'.
注意上述的三行初始化信息!
  • 注意Spark-shell中的textFile(path),参数path默认为hdfs://,要使用file://显式声明
代码语言:javascript
复制

scala> val lines = sc.textFile("/home/hadoop/look.sh")
lines: org.apache.spark.rdd.RDD[String] = /home/hadoop/look.sh MapPartitionsRDD[1] at textFile at <console>:24
scala> lines.count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hadoop01:9000/home/hadoop/look.sh
  at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
  at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
  ... 48 elided
scala> val lines = sc.textFile("file:///home/hadoop/look.sh")
lines: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/look.sh MapPartitionsRDD[3] at textFile at <console>:24
scala> lines.count()
res1: Long = 26
scala> lines.first()
res2: String = #!/bin/bash

开发环境搭建

安装Scala环境

注意:

  • Scala环境本身的安装跟Spark无关,Scala本身就是一门类似Java的语言
  • 可以在非集群内的主机安装该开发环境,然后通过ssh提交集群运行即可 (Spark版本2.x.x - Scala版本2.11.x以上,在IDEA中新建项目时会在首选项中进行选择)

第一个Scala程序:WordCount

注意: 类似于Hadoop,如果开发环境不在集群内,例如在自己PC中的IDEA进行开发(使用虚拟机同理),那么就会产生两种运行方式,一是本地运行,二是提交集群运行。 本质上两种方式都是先打包,再上传(本地或集群)。即流程是一致的,但是在PC中引入的spark-core的作用是不同的,提交集群运行时,PC中的spark-core内容只是作为语法检查,类方法调用等辅助作用;但是本地运行时,除了上述功能外,其还充当了计算部分,即可以使PC成为一个类似节点的且有计算能力的存在。

全部步骤: PC上安装Scala环境,IDEA,IDEA安装Scala插件

1.本地运行

  • 新建Scala的Project,注意要选对应的scala版本
  • 然后在build.sbt中添加spark-core的依赖,可以去MavenRepositories网站去查,找到sbt(ivy)的依赖格式就行了
  • 然后新建一个scala class,选择object,书写代码,要使用本地模式
  • 最后直接点击运行即可。
代码语言:javascript
复制
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object WordCount extends App {
  // 读取hdfs文件
  val path = "hdfs://192.168.146.130:9000/spark/look.sh"
  //远程调试
  val conf = new SparkConf()
    .setAppName("scalasparktest")
    .setMaster("spark://192.168.146.130:7077")
    .setJars(List("I:\\IDEA_PROJ\\ScalaSparkTest\\out\\scalasparktest_jar\\scalasparktest.jar"))
  val sc = new SparkContext(conf)
  val lines = sc.textFile(path)
  val words = lines.flatMap(_.split(" ")).filter(word => word != " ")
  val pairs = words.map(word => (word, 1))
  val wordscount: RDD[(String, Int)] = pairs.reduceByKey(_ + _)
  wordscount.collect.foreach(println)
}

打印结果: 注意下述的IP地址和file路径,确实是在本地运行的,而且就是引入的sparl-core起的作用

代码语言:javascript
复制
D:\Java\jdk1.8.0_77\bin\java "-javaagent:D:\JetBrains\IntelliJ IDEA 
...
17/11/28 00:40:21 INFO Executor: Starting executor ID driver on host localhost
17/11/28 00:40:21 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58570.
17/11/28 00:40:21 INFO NettyBlockTransferService: Server created on 192.168.230.1:58570
17/11/28 00:40:21 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/11/28 00:40:21 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.230.1, 58570, None)
17/11/28 00:40:21 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.230.1:58570 with 1992.9 MB RAM, BlockManagerId(driver, 192.168.230.1, 58570, None)
...
17/11/28 00:40:22 INFO HadoopRDD: Input split: file:/C:/Users/msi/Desktop/xiaomi2.txt:0+903
17/11/28 00:40:22 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1111 bytes result sent to driver
17/11/28 00:40:22 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 183 ms on localhost (executor driver) (1/1)
...
(小米客服那些事,1)
(贤艾森秋t4krP0,1)
(北京IHG向,1)
17/11/28 00:40:22 INFO SparkContext: Invoking stop() from shutdown hook
17/11/28 00:40:22 INFO SparkUI: Stopped Spark web UI at http://192.168.230.1:4040
...
Process finished with exit code 0

2.提交集群运行

  • 第一步同本地模式
  • 第二步同本地模式
  • 然后新建一个scala class,选择object,书写代码,要使集群模式
  • 最后直接点击运行即可。
代码语言:javascript
复制
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object WordCount extends App {
  // 读取hdfs文件
  val path = "hdfs://192.168.146.130:9000/spark/look.sh"
  //远程调试
  val conf = new SparkConf()
    .setAppName("scalasparktest")
    .setMaster("spark://192.168.146.130:7077")
    .setJars(List("I:\\IDEA_PROJ\\ScalaSparkTest\\out\\scalasparktest_jar\\scalasparktest.jar"))
  val sc = new SparkContext(conf)
  val lines = sc.textFile(path)
  val words = lines.flatMap(_.split(" ")).filter(word => word != " ")
  val pairs = words.map(word => (word, 1))
  val wordscount: RDD[(String, Int)] = pairs.reduceByKey(_ + _)
  wordscount.collect.foreach(println)
}
image_1bvv7g94014f91qml150nsnb1ito9.png-60.3kB
image_1bvv7g94014f91qml150nsnb1ito9.png-60.3kB

此处一定要选择对Module(不是默认)和要运行的MainClass

image_1bvv7i5bj14a0l4c2kc1pra1d70m.png-43kB
image_1bvv7i5bj14a0l4c2kc1pra1d70m.png-43kB

点击OK后,选择Jar打包后的路径

image_1bvv8p92j104td4dhfn1ro59b51g.png-154.8kB
image_1bvv8p92j104td4dhfn1ro59b51g.png-154.8kB

使用命令: 启动master: ./sbin/start-master.sh 启动worker: ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://192.168.146.130:7077 需要配置spark-env.sh中:(下面设为localhost就远程不了了) export SPARK_MASTER_HOST=192.168.146.130 export SPARK_LOCAL_IP=192.168.146.130 注意更新配置文件后需要把master和worker都重启才可以生效(单机两者都在一个机器上的情况)

出现的错误: 错误:java.io.FileNotFoundException: Jar I:\IDEA_PROJ\ScalaSparkTest\out\scalasparktest.jar not found 解决:修改setJar方法参数中的jar路径

错误:Could not connect to spark://192.168.146.130:7077 解决:重启worker和master,前提是spark-env.sh中的MASTER_IP和WORKER_IP要设置正确

错误:Exception: Call From msi-PC/192.168.230.1 to 192.168.146.130:8020 failed on connection exception: java.net.ConnectException: Connection refused: no further information; 解决:hdfs端口错误,很多教程写的是8020端口,但我hdfs是9000端口,所以要更正

错误:Invalid signature file digest for Manifest main attributes 解决:打包的文件很大,把全部依赖都打包了,90多M,但正常应该10多M,删掉无用的依赖,并且把sbt中spark-core的依赖设为provided模式

image_1bvvammdghvqppd1dp3nrc1ema1t.png-110.7kB
image_1bvvammdghvqppd1dp3nrc1ema1t.png-110.7kB

错误:重复出现如下错误

代码语言:javascript
复制
17/11/28 20:20:52 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
17/11/28 20:21:07 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
17/11/28 20:21:22 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

解决:Worker失效后被kill了[此时jps应该是没有Worker的],重启Worker即可,还不行就将hadoop和spark都重启

提交集群运行的结果:(注意IP和端口,确实是提交到集群/虚拟机 上运行后返回的结果) 整个过程全部在IDEA中,完全达到了本地调试,自动上传集群,并返回结果的流程

代码语言:javascript
复制
D:\Java\jdk1.8.0_77\bin\java "-javaagent:D:\JetBrains\IntelliJ IDEA 
...
17/11/28 02:09:39 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20170630223625-0006/0 on worker-20170630215502-192.168.146.130-50762 (192.168.146.130:50762) with 1 cores
17/11/28 02:09:39 INFO StandaloneSchedulerBackend: Granted executor ID app-20170630223625-0006/0 on hostPort 192.168.146.130:50762 with 1 cores, 1024.0 MB RAM
17/11/28 02:09:39 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20170630223625-0006/0 is now RUNNING
...
17/11/28 02:09:43 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.146.130:47071 with 413.9 MB RAM, BlockManagerId(0, 192.168.146.130, 47071, None)
...
17/11/28 02:09:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
17/11/28 02:09:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.146.130, executor 0, partition 0, ANY, 4853 bytes)
...
17/11/28 02:09:55 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.146.130, executor 0, partition 1, ANY, 4853 bytes)
...
17/11/28 02:09:55 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
17/11/28 02:09:55 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 192.168.146.130, executor 0, partition 0, NODE_LOCAL, 4625 bytes)
...
17/11/28 02:09:56 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 192.168.146.130, executor 0, partition 1, NODE_LOCAL, 4625 bytes)
...
(-ef|grep,1)
($Jarstr,1)
([[,1)
(do,1)
(YES,1)
(while,1)
("$Jarinfo",1)
(echo,1)
(#!/bin/bash,1)
17/11/28 02:09:56 INFO SparkContext: Invoking stop() from shutdown hook
...
Process finished with exit code 0
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-11-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本概括
    • 概述
      • 主要内容
        • 发展历史
          • 主要组件
            • 紧密集成的优点
              • hadoop应用场景
                • spark应用场景
                  • Doug Cutting的观点
                  • 运行环境搭建
                    • 基础环境
                      • 具体步骤
                        • Spark目录
                          • Spark shell
                            • Scala shell:/bin/scala-shell
                            • 开发环境搭建
                              • 安装Scala环境
                                • 第一个Scala程序:WordCount
                                相关产品与服务
                                远程调试
                                远程调试(Remote Debugging,RD)在云端为用户提供上千台真实手机/定制机/模拟器设备,快速实现随时随地测试。运用云测技术对测试方式、操作体验进行了优化,具备多样性的测试能力,包括随时截图和记录调试日志,稳定的支持自动化测试, 设备灵活调度,用例高效执行, 快速定位产品功能和兼容性问题。云手机帮助应用、移动游戏快速发现和解决问题,节省百万硬件费用,加速敏捷研发流程。
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档