作为一名合格的计算机人士,百折不挠的瞎折腾精神是必备的。今天本想使用一下尘封已久的VMware虚拟机搭的集群,结果发现 Spark 有各种问题,应该是之前潦草搭集群时挖下的坑(前几天也用过,但并不是cluster mode
,我现在才知道..),面对这些坑,果断的选择重装啊,所以叒叒叒开始愉快的搭环境了,,
不过这次格外注重了各处细节,力图条理清晰的记录一次搭建过程,除了 Scala 和 Spark 的搭建过程,当然还有运行调试(这才是关键)部分,包括用IDEA打包 jar 上传执行 和IDEA远程提交执行,这里也都分别作了记录。
关于IDEA提交Spark任务的几种方式,可以参见我 另一篇文章 .
得亏了我16G
的内存,四个虚拟机全开还可以娱乐的玩耍,这四台虚拟机已经装过Hadoop
了,Hadoop
集群用起来也没什么问题,就保留了。
各版本如下:
配置项 | 版本 | 备注 |
---|---|---|
Hadoop | 2.7.3 | |
Java | 1.8.0 | |
Scala | 2.11.8 | 待安装 |
Spark | 2.2.0 | 待安装 |
$ wget http://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
$ tar -zxvf scala-2.11.8.tgz
$ mv scala-2.11.8.tgz scala
/etc/profile
$ sudo vi /etc/profile
//在文件的最后插入
export SCALA_HOME=/usr/local/scala
export PATH=$PATH:$SCALA_HOME/bin
$ source /etc/profile
1 | $ scala -version |
---|
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
$ tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz
$ mv spark-2.2.0-bin-hadoop2.7 spark
/etc/profile
$ vi /etc/profile
//在最后加入
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
$ source /etc/profile
$ cd spark/conf
//先改名,把template去掉
$ mv spark-env.sh.template spark-env.sh
$ mv slaves.sh.template slaves.sh
$ vi conf/spark-env.sh
//在最后添加各项变量值
export JAVA_HOME=/usr/local/java/jdk1.8.0_112
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SCALA_HOME=/usr/local/scala
export SPARK_MASTER_IP=192.168.146.130(hadoop01)
export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
$ vi conf/slaves
//在最后添加各从节点映射(主机名或IP)
hadoop02
hadoop03
hadoop04
//还有spark-defaults.conf,一开始没改,结果导致出错
$ # spark-defaults.conf 的修改在后面
Slave
节点(其实可以脚本化,偷懒..)//scala
$ scp -r scala hadoop02:/usr/local/
$ scp -r scala hadoop03:/usr/local/
$ scp -r scala hadoop04:/usr/local/
//spark
$ scp -r spark hadoop02:/usr/local/
$ scp -r spark hadoop03:/usr/local/
$ scp -r spark hadoop04:/usr/local/
//profile
$ sudo scp /etc/profile hadoop02:/etc/profile
$ sudo scp /etc/profile hadoop03:/etc/profile
$ sudo scp /etc/profile hadoop04:/etc/profile
因为我们只需要使用hadoop的HDFS文件系统,所以我们并不用把hadoop全部功能都启动。
1 | $ start-dfs.sh |
---|
因为 hadoop/sbin
以及 spark/sbin
均配置到了系统的环境中,它们同一个文件夹下存在同样的 start-all.sh
文件。最好是打开spark-2.2.0
,在文件夹下面打开该文件。
12 | $ cd /usr/local/spark/sbin$ ./start-all.sh |
---|
[hadoop@hadoop01 ~]$ jps
18822 SecondaryNameNode
18521 NameNode
18634 DataNode
18990 Master
19055 Jps
[hadoop@hadoop02 ~]$ jps
33380 DataNode
33589 Jps
33519 Worker
[hadoop@hadoop03 ~]$ jps
25876 Jps
25656 DataNode
25806 Worker
[hadoop@hadoop04 ~]$ jps
32162 Worker
32025 DataNode
32234 Jps
注意这里是 3
个 worker
因为 master
节点没有 配置启动 Worker
,当然可以配置(比如 hdfs
就是四个 datanode
)
但是这里 spark 要执行计算任务,所以主节点最好不要有worker以免出现计算任务争夺主节点资源
这里的实例程序 读取 hdfs
文件 Vote-demo.txt
,并且使用 GraphX
读取文件生成图,并打印图的边数。
RemoteDemo.scala
package Remote
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RemoteDemo {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("SimpleDemo")
.setMaster("spark://hadoop01:7077")
.setJars(List("I:\\IDEA_PROJ\\VISNWK\\out\\artifacts\\visnwk_jar\\visnwk.jar"))
val sc = new SparkContext(conf)
def loadEdges(fn: String): Graph[Any, String] = {
val edges: RDD[Edge[String]] =
sc.textFile(fn).filter(l => !(l.startsWith("#"))).map { //无放回
line =>
val fields = line.split("\t")
Edge(fields(0).toLong, fields(1).toLong, "1.0")
}
val graph: Graph[Any, String] = Graph.fromEdges(edges, "defaultProperty")
graph
}
val graph = loadEdges("hdfs://hadoop01:9000/TVCG/SNAP/DATASET/Vote-demo.txt")
println(s"graph.edges.count() = ${graph.edges.count()}")
sc.stop // 一开始没加,报错了
}
}
Main Class
Build
打包Run Configure
spark-env.sh
saprk
集群12 | $ ./sbin/stop-all.sh$ ./sbin/start-all.sh |
---|
Master_IP:8080
显示正常spark-defaults.conf
[hadoop@hadoop01 bin]$ ./spark-submit --class "Remote.RemoteDemo" ~/visnwk-build.jar
17/07/03 19:48:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/03 19:49:03 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find AppClient.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)
at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:134)
at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:644)
at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:178)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
解决: 这里
示例代码最后添加:
1 | sc.stop |
---|
1 | http://192.168.146.130:4040/jobs/ 4040 UI界面只有在job运行时才可见,运行完后就不可访问 |
---|
//直接用xxxx这个错误IP报错如下
18/05/25 19:06:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://118.202.40.210:4042
18/05/25 19:06:20 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://xxxxx:7077...
18/05/25 19:06:23 WARN TransportClientFactory: DNS resolution for xxxxx:7077 took 2551 ms
18/05/25 19:06:23 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master xxxxx:7077
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
//用master_ip报错如下
18/05/25 19:07:26 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://118.202.40.210:4040
18/05/25 19:07:27 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.146.130:7077...
18/05/25 19:07:27 INFO TransportClientFactory: Successfully created connection to /192.168.146.130:7077 after 23 ms (0 ms spent in bootstraps)
18/05/25 19:07:27 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.146.130:7077
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
//比较上述代码,会发现虽然最后的错误一样,但是中间日志并不一样,所以并不是简单的连接失败
12 | libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"libraryDependencies += "org.apache.spark" %% "spark-graphx" % "2.2.0" |
---|
val conf = new SparkConf()
.setAppName("SimpleDemo")
.setMaster("spark://192.168.146.130:7077")
//.setIfMissing("spark.driver.host", "127.0.0.1") // 不设置会默认使用本机的物理IP
.setJars(List("I:\\IDEA_PROJ\\VISNWK\\out\\artifacts\\visnwk_jar\\visnwk.jar"))
val sc = new SparkContext(conf)
18/05/25 19:25:36 INFO DAGScheduler: Job 0 finished: reduce at EdgeRDDImpl.scala:90, took 60.614562 s
graph.edges.count() = 103689 // 终于等到你!!
18/05/25 19:25:36 INFO SparkUI: Stopped Spark web UI at http://118.202.40.210:4040
18/05/25 19:25:36 INFO StandaloneSchedulerBackend: Shutting down all executors
18/05/25 19:25:36 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
18/05/25 19:25:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/05/25 19:25:36 INFO MemoryStore: MemoryStore cleared
18/05/25 19:25:36 INFO BlockManager: BlockManager stopped
18/05/25 19:25:36 INFO BlockManagerMaster: BlockManagerMaster stopped
18/05/25 19:25:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/05/25 19:25:36 INFO SparkContext: Successfully stopped SparkContext
18/05/25 19:25:36 INFO ShutdownHookManager: Shutdown hook called
18/05/25 19:25:36 INFO ShutdownHookManager: Deleting directory C:\Users\msi\AppData\Local\Temp\spark-fae200dd-12cc-4b8a-b2ec-751d641d3689
Process finished with exit code 0