Spark 运行模式之一,用于在本地机器上单机模拟分布式计算的环境。在 local 模式下,Spark 会使用单个 JVM 进程来模拟分布式集群行为,所有 Spark 组件(如 SparkContext、Executor 等)都运行在同一个 JVM 进程中,不涉及集群间通信,适用本地开发、测试和调试。
设置 SparkConf 中的 spark.master
属性为 "local"
来指定运行模式。如Scala中这样设置:
import org.apache.spark.{SparkConf, SparkContext}
object SparkLocalExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkLocalExample").setMaster("local")
val sc = new SparkContext(conf)
// 在这里编写你的 Spark 应用程序逻辑
sc.stop() // 停止 SparkContext
}
}
local 模式仅适用于小规模数据处理和本地开发测试场景,并不适用于生产环境的大规模数据处理任务。在生产环境中,需要使用集群模式(如 standalone、YARN、Mesos 等)来运行 Spark 应用程序,以便充分利用集群资源和提高作业的并行度。
通常一个 Spark 程序对应一个 SparkContext 实例。SparkContext 是 Spark 应用程序的主入口点,负责与集群进行通信,管理作业的调度和执行,以及维护应用程序的状态。因此,一个 SparkContext 实例通常对应一个独立的 Spark 应用程序。
在正常情况下,创建多个 SparkContext 实例是不推荐的,因为这可能会导致资源冲突、内存泄漏和性能下降等问题。Spark 本身设计为单个应用程序对应一个 SparkContext,以便于有效地管理资源和执行作业。
然而,在某些特殊情况下,可能会存在多个 SparkContext 实例的情况:
创建多个 SparkContext 实例时需要谨慎处理,并且需要确保它们能够正确地管理资源、避免冲突,并且不会影响其他应用程序或作业的正常运行。在生产环境中,建议仅使用一个 SparkContext 实例来管理整个应用程序。
SparkContext是Spark应用的入口点,负责初始化Spark应用所需要的环境和数据结构。
所以,一个标准的Spark应用对应一个SparkContext实例。通过创建SparkContext来开始我们的程序,在其上执行各种操作,并在结束时关闭该实例。
input.txt
JavaEdge,JavaEdge,JavaEdge
go,go
scalascala
package com.javaedge.bigdata.chapter02
import org.apache.spark.{SparkConf, SparkContext}
/**
* 词频统计案例
* 输入:文件
* 需求:统计出文件中每个单词出现的次数
* 1)读每一行数据
* 2)按照分隔符把每一行的数据拆成单词
* 3)每个单词赋上次数为1
* 4)按照单词进行分发,然后统计单词出现的次数
* 5)把结果输出到文件中
* 输出:文件
*/
object SparkWordCountApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("/Users/javaedge/Downloads/sparksql-train/data/input.txt")
rdd.collect().foreach(println)
sc.stop()
}
发现启动后,报错啦:
ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:368)
at com.javaedge.bigdata.chapter02.SparkWordCountApp$.main(SparkWordCountApp.scala:25)
at com.javaedge.bigdata.chapter02.SparkWordCountApp.main(SparkWordCountApp.scala)
ERROR Utils: Uncaught exception in thread main
必须设置集群?我才刚入门大数据诶,这么麻烦?劝退,不学了!还好 spark 也支持简单部署:
val sparkConf = new SparkConf().setMaster("local")
重启,又报错:
ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: An application name must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:371)
at com.javaedge.bigdata.chapter02.SparkWordCountApp$.main(SparkWordCountApp.scala:25)
at com.javaedge.bigdata.chapter02.SparkWordCountApp.main(SparkWordCountApp.scala)
ERROR Utils: Uncaught exception in thread main
val sparkConf = new SparkConf().setMaster("local").setAppName("SparkWordCountApp")
成功了!
val rdd = sc.textFile("/Users/javaedge/Downloads/sparksql-train/data/input.txt")
rdd.flatMap(_.split(","))
.map(word => (word, 1)).collect().foreach(println)
sc.stop()
output:
(pk,1)
(pk,1)
(pk,1)
(jepson,1)
(jepson,1)
(xingxing,1)
rdd.flatMap(_.split(","))
// 3)每个单词赋上次数为1
.map(word => (word, 1))
.reduceByKey(_ + _)
.saveAsTextFile("/Users/javaedge/Downloads/sparksql-train/data/output.txt")
// 2)按照分隔符把每一行的数据拆成单词
rdd.flatMap(_.split(","))
// 3)每个单词赋上次数为1
.map(word => (word, 1))
// 4)按照单词进行分发,然后统计单词出现的次数
.reduceByKey(_ + _)
// 结果按单词频率降序排列,既然之前是 <单词,频率> 且 sortKey 只能按 key 排序,那就在这里反转 kv 顺序
.map(x => (x._2, x._1))
.collect().foreach(println)
output:
(2,go)
(1,scalascala)
(3,JavaEdge)
显然结果不符合期望。如何调整呢?再翻转一次!
rdd.flatMap(_.split(","))
.map(word => (word, 1))
.reduceByKey(_ + _)
// 结果按单词频率降序排列,既然之前是 <单词,频率> 且 sortKey 只能按 key 排序,那就在这里反转 kv 顺序
.map(x => (x._2, x._1))
.sortByKey(false)
.map(x => (x._2, x._1))
.collect().foreach(println)
output:
(JavaEdge,3)
(go,2)
(scalascala,1)
javaedge@JavaEdgedeMac-mini bin % ./spark-shell --master local
23/03/23 16:28:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://172.16.1.55:4040
Spark context available as 'sc' (master = local, app id = local-1679560146321).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
$ ./spark-submit --master yarn \
--deploy-mode client \
--class <main_class> \
--num-executors <num_executors> \
--executor-memory <executor_memory> \
--executor-cores <executor_cores> \
<path_to_jar_or_py_file> \
<app_arguments>
各参数含义:
--master yarn
: 指定使用YARN作为Spark的资源管理器。--deploy-mode client
: 指定部署模式为client模式,即Driver程序运行在提交Spark任务的客户端机器上。--class <main_class>
: 指定Spark应用程序的主类。--num-executors <num_executors>
: 指定执行器的数量。--executor-memory <executor_memory>
: 指定每个执行器的内存大小。--executor-cores <executor_cores>
: 指定每个执行器的核心数。<path_to_jar_or_py_file>
: 指定要提交的Spark应用程序的JAR文件或Python文件的路径。<app_arguments>
: 指定Spark应用程序的参数。如提交一个Scala版本的Spark应用程序的命令:
$ ./spark-submit --master yarn \
--deploy-mode client \
--class com.example.MySparkApp \
--num-executors 4 \
--executor-memory 2G \
--executor-cores 2 \
/path/to/my-spark-app.jar \
arg1 arg2 arg3
如果你要提交一个Python版本的Spark应用程序,可以使用以下命令:
$ ./spark-submit --master yarn \
--deploy-mode client \
/path/to/my-spark-app.py \
arg1 arg2 arg3
这样就可以通过YARN提交Spark任务,Spark会向YARN请求资源并在集群上执行任务。