好久没有写 Spark 相关的代码了,结果写了一个如下类似的代码,在 drive 端 new 了一个 arrayList,然后再 Executor 端进行 add 操作,最后再在 drive 端打印 这个 list .size。结果发现这个 size 为0,顿时好奇心就上来了。
List<SuggestFeedback> slows = new ArrayList<>();
rowJavaRDD.foreachPartition(rowIterator -> {
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
SuggestFeedback suggestFeedback = new SuggestFeedback();
suggestFeedback.setDeviceId( Objects.isNull(row.getAs("device_id")) ? "" : row.getAs("device_id").toString());
String suggest = suggestFeedback.getLevel1_mass_category();
if (suggest.equals("卡顿问题")) {
slows.add(suggestFeedback);
}
}
System.out.print(slow.size())
在 Apache Spark 中,Driver 端和 Executor 端(TaskManager)运行在不同的 JVM 进程中,因此它们之间的内存是不共享的。如果你在 Driver 端创建了一个 List
,并尝试在 Executor 端(TaskManager)通过 list.add
添加数据,这是不会生效的,因为 Executor 端操作的是它自己的内存空间,而不是 Driver 端的内存空间。
如果你需要在 Driver 端收集 Executor 端的数据,可以使用以下几种方式:
collect
方法collect
方法会将 Executor 端的数据收集到 Driver 端。你可以在 Executor 端处理数据,然后使用 collect
将结果返回到 Driver 端。
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = rdd.map(x => x * 2).collect() // 将数据收集到 Driver 端
val list = result.toList // 转换为 List
aggregate
或 reduce
方法如果你需要在 Executor 端进行聚合操作,并将结果返回到 Driver 端,可以使用 aggregate
或 reduce
方法。
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val sum = rdd.reduce(_ + _) // 在 Executor 端进行聚合操作,结果返回到 Driver 端
累加器是一种特殊的变量,可以在 Executor 端进行累加操作,最终结果会在 Driver 端可见。
val accum = sc.longAccumulator("My Accumulator")
rdd.foreach(x => accum.add(x))
println(accum.value) // 在 Driver 端获取累加器的值
如果你需要在 Executor 端访问 Driver 端的数据,可以使用广播变量。广播变量会将 Driver 端的数据发送到每个 Executor 端,但 Executor 端不能修改广播变量的值。
val list = List(1, 2, 3, 4, 5)
val broadcastVar = sc.broadcast(list)
val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.map(x => x + broadcastVar.value.sum).collect()
在 Spark 中,Driver 端和 Executor 端的内存是隔离的,因此无法直接在 Executor 端修改 Driver 端的数据结构(如 List
)。你需要通过 collect
、aggregate
、累加器或广播变量等方式在 Driver 端和 Executor 端之间传递数据。
在 Apache Spark 或 Apache Flink 中,如果你在 Executor 端(TaskManager) 尝试修改 Driver 端(JobManager) 创建的 List
或其他数据结构,程序不会直接报错,但修改不会生效。这是因为 Executor 端和 Driver 端运行在不同的 JVM 进程中,内存是隔离的。以下详细解释为什么不会报错,以及背后的原理。
List
或其他对象传递给 Executor 端 时,Spark 或 Flink 会将该对象序列化并发送到 Executor 端。list.add
)只会影响 Executor 端 的副本,而不会影响 Driver 端 的原始对象。List
,这些修改只会影响 Executor 端 的副本,而不会同步回 Driver 端。List
)传递给 Executor 端 时,Spark 或 Flink 会将该对象序列化并发送到 Executor 端。list.add
)只会影响 Executor 端 的副本,而不会影响 Driver 端 的原始对象。为了避免在 Executor 端 修改 Driver 端 的数据结构而不生效的问题,可以采用以下方法:
使用 collect
方法:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = rdd.map(x => x * 2).collect() // 将数据收集到 Driver 端
val list = result.toList // 转换为 List
使用累加器(Accumulator):
val accum = sc.longAccumulator("My Accumulator")
rdd.foreach(x => accum.add(x))
println(accum.value) // 在 Driver 端获取累加器的值
使用广播变量(Broadcast Variable):
val list = List(1, 2, 3, 4, 5)
val broadcastVar = sc.broadcast(list)
val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.map(x => x + broadcastVar.value.sum).collect()
使用 DataSet
或 DataStream
API:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> result = data.map(x -> x * 2);
List<Integer> resultList = result.collect(); // 将数据收集到 JobManager 端
使用累加器(Accumulator):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
LongCounter counter = new LongCounter();
env.fromElements(1, 2, 3, 4, 5)
.map(x -> {
counter.add(1); // 在 TaskManager 端累加
return x;
})
.print(); // 触发任务执行
env.execute();
System.out.println("Counter value: " + counter.getLocalValue()); // 在 JobManager 端获取累加器的值
使用广播变量(Broadcast Variable):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> data = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> broadcastData = env.fromElements(10, 20, 30);
data.connect(broadcastData.broadcast())
.process(new BroadcastProcessFunction<Integer, Integer, Integer>() {
@Override
public void processElement(Integer value, ReadOnlyContext ctx, Collector<Integer> out) {
// 访问广播变量
for (Integer broadcastValue : ctx.getBroadcastState().get("broadcastState")) {
out.collect(value + broadcastValue);
}
}
@Override
public void processBroadcastElement(Integer value, Context ctx, Collector<Integer> out) {
// 更新广播状态
ctx.getBroadcastState().put("broadcastState", value);
}
})
.print();
env.execute();
collect
、累加器、广播变量等)来实现 Driver 端 和 Executor 端 之间的数据传递和共享。Driver 端 和 Executor 端 运行在不同的 JVM 进程中,内存隔离导致 Executor 端 对 Driver 端 数据结构的修改不会生效。
数据从 Driver 端 传递到 Executor 端 是通过序列化和反序列化实现的,这是一个单向过程,修改不会同步回 Driver 端。
解决方案包括使用 collect 方法将数据收集到 Driver 端、使用累加器进行分布式累加、使用广播变量共享数据等。
这种行为不会报错,因为 Executor 端 操作的是自己的副本,而不是 Driver 端 的原始对象。