前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark Drive 端的 List 无法获取 Executor 中的数据吗?

Spark Drive 端的 List 无法获取 Executor 中的数据吗?

作者头像
shengjk1
发布2025-05-16 15:05:31
发布2025-05-16 15:05:31
7300
代码可运行
举报
文章被收录于专栏:码字搬砖码字搬砖
运行总次数:0
代码可运行

一、背景

好久没有写 Spark 相关的代码了,结果写了一个如下类似的代码,在 drive 端 new 了一个 arrayList,然后再 Executor 端进行 add 操作,最后再在 drive 端打印 这个 list .size。结果发现这个 size 为0,顿时好奇心就上来了。

代码语言:javascript
代码运行次数:0
运行
复制
 	    List<SuggestFeedback> slows = new ArrayList<>();

        rowJavaRDD.foreachPartition(rowIterator -> {
            while (rowIterator.hasNext()) {
                Row row = rowIterator.next();
                SuggestFeedback suggestFeedback = new SuggestFeedback();
                suggestFeedback.setDeviceId( Objects.isNull(row.getAs("device_id")) ? "" : row.getAs("device_id").toString());
                String suggest = suggestFeedback.getLevel1_mass_category();
                if (suggest.equals("卡顿问题")) {
                    slows.add(suggestFeedback);
                } 
       }
       System.out.print(slow.size())

二、spark 中解决方案( 其他分布式计算引擎也类似 )

在 Apache Spark 中,Driver 端和 Executor 端(TaskManager)运行在不同的 JVM 进程中,因此它们之间的内存是不共享的。如果你在 Driver 端创建了一个 List,并尝试在 Executor 端(TaskManager)通过 list.add 添加数据,这是不会生效的,因为 Executor 端操作的是它自己的内存空间,而不是 Driver 端的内存空间。

解决方案

如果你需要在 Driver 端收集 Executor 端的数据,可以使用以下几种方式:

1. 使用 collect 方法

collect 方法会将 Executor 端的数据收集到 Driver 端。你可以在 Executor 端处理数据,然后使用 collect 将结果返回到 Driver 端。

代码语言:javascript
代码运行次数:0
运行
复制
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = rdd.map(x => x * 2).collect()  // 将数据收集到 Driver 端
val list = result.toList  // 转换为 List
2. 使用 aggregatereduce 方法

如果你需要在 Executor 端进行聚合操作,并将结果返回到 Driver 端,可以使用 aggregatereduce 方法。

代码语言:javascript
代码运行次数:0
运行
复制
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val sum = rdd.reduce(_ + _)  // 在 Executor 端进行聚合操作,结果返回到 Driver 端
3. 使用累加器(Accumulator)

累加器是一种特殊的变量,可以在 Executor 端进行累加操作,最终结果会在 Driver 端可见。

代码语言:javascript
代码运行次数:0
运行
复制
val accum = sc.longAccumulator("My Accumulator")
rdd.foreach(x => accum.add(x))
println(accum.value)  // 在 Driver 端获取累加器的值
4. 使用广播变量(Broadcast Variable)

如果你需要在 Executor 端访问 Driver 端的数据,可以使用广播变量。广播变量会将 Driver 端的数据发送到每个 Executor 端,但 Executor 端不能修改广播变量的值。

代码语言:javascript
代码运行次数:0
运行
复制
val list = List(1, 2, 3, 4, 5)
val broadcastVar = sc.broadcast(list)
val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.map(x => x + broadcastVar.value.sum).collect()
总结

在 Spark 中,Driver 端和 Executor 端的内存是隔离的,因此无法直接在 Executor 端修改 Driver 端的数据结构(如 List)。你需要通过 collectaggregate、累加器或广播变量等方式在 Driver 端和 Executor 端之间传递数据。

三、为什么不会生效和报错

在 Apache Spark 或 Apache Flink 中,如果你在 Executor 端(TaskManager) 尝试修改 Driver 端(JobManager) 创建的 List 或其他数据结构,程序不会直接报错,但修改不会生效。这是因为 Executor 端和 Driver 端运行在不同的 JVM 进程中,内存是隔离的。以下详细解释为什么不会报错,以及背后的原理。


1. 为什么不会报错?
Spark 和 Flink 的执行模型
  • SparkFlink 都是分布式计算框架,它们的任务执行模型是基于 Driver-ExecutorJobManager-TaskManager 的架构。
  • Driver 端(JobManager) 负责协调任务,而 Executor 端(TaskManager) 负责实际执行任务。
  • Executor 端Driver 端 运行在不同的 JVM 进程中,它们之间的内存是隔离的。
Java 对象的序列化和传递
  • 当你将一个 List 或其他对象传递给 Executor 端 时,Spark 或 Flink 会将该对象序列化并发送到 Executor 端
  • Executor 端,这个对象会被反序列化,生成一个 新的对象,这个对象与 Driver 端 的对象是完全独立的。
  • 因此,在 Executor 端 对这个对象的任何修改(如 list.add)只会影响 Executor 端 的副本,而不会影响 Driver 端 的原始对象。
为什么不会报错?
  • Executor 端 的操作是在自己的 JVM 进程中执行的,修改的是自己的副本,因此不会触发任何异常。
  • Driver 端Executor 端 之间的内存隔离是设计上的特性,而不是错误。框架本身不会对这种行为进行检查或报错。

2. 为什么修改不会生效?
内存隔离
  • Driver 端Executor 端 运行在不同的 JVM 进程中,它们的内存是完全隔离的。
  • 即使你在 Executor 端 修改了 List,这些修改只会影响 Executor 端 的副本,而不会同步回 Driver 端
对象的序列化和反序列化
  • 当你将一个对象(如 List)传递给 Executor 端 时,Spark 或 Flink 会将该对象序列化并发送到 Executor 端
  • Executor 端,这个对象会被反序列化,生成一个 新的对象。这个新对象与 Driver 端 的对象是完全独立的。
  • 因此,在 Executor 端 对这个对象的任何修改(如 list.add)只会影响 Executor 端 的副本,而不会影响 Driver 端 的原始对象。

3. 如何避免这种问题?

为了避免在 Executor 端 修改 Driver 端 的数据结构而不生效的问题,可以采用以下方法:

在 Spark 中

使用 collect 方法

  • Executor 端 的数据收集到 Driver 端,然后在 Driver 端 进行修改。
代码语言:javascript
代码运行次数:0
运行
复制
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = rdd.map(x => x * 2).collect()  // 将数据收集到 Driver 端
val list = result.toList  // 转换为 List

使用累加器(Accumulator)

  • 累加器可以在 Executor 端 进行累加操作,最终结果会在 Driver 端 可见。
代码语言:javascript
代码运行次数:0
运行
复制
val accum = sc.longAccumulator("My Accumulator")
rdd.foreach(x => accum.add(x))
println(accum.value)  // 在 Driver 端获取累加器的值

使用广播变量(Broadcast Variable)

  • 广播变量可以将 Driver 端 的数据分发到所有 Executor 端,但 Executor 端 只能读取广播变量,不能修改。
代码语言:javascript
代码运行次数:0
运行
复制
val list = List(1, 2, 3, 4, 5)
val broadcastVar = sc.broadcast(list)
val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.map(x => x + broadcastVar.value.sum).collect()
在 Flink 中

使用 DataSetDataStream API

  • TaskManager 端 的数据返回到 JobManager 端
代码语言:javascript
代码运行次数:0
运行
复制
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> result = data.map(x -> x * 2);
List<Integer> resultList = result.collect(); // 将数据收集到 JobManager 端

使用累加器(Accumulator)

  • 累加器可以在 TaskManager 端 进行累加操作,最终结果会在 JobManager 端 可见。
代码语言:javascript
代码运行次数:0
运行
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
LongCounter counter = new LongCounter();
env.fromElements(1, 2, 3, 4, 5)
   .map(x -> {
       counter.add(1); // 在 TaskManager 端累加
       return x;
   })
   .print(); // 触发任务执行
env.execute();
System.out.println("Counter value: " + counter.getLocalValue()); // 在 JobManager 端获取累加器的值

使用广播变量(Broadcast Variable)

  • 广播变量可以将 JobManager 端 的数据分发到所有 TaskManager 端,但 TaskManager 端 只能读取广播变量,不能修改。
代码语言:javascript
代码运行次数:0
运行
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> data = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> broadcastData = env.fromElements(10, 20, 30);

data.connect(broadcastData.broadcast())
    .process(new BroadcastProcessFunction<Integer, Integer, Integer>() {
        @Override
        public void processElement(Integer value, ReadOnlyContext ctx, Collector<Integer> out) {
            // 访问广播变量
            for (Integer broadcastValue : ctx.getBroadcastState().get("broadcastState")) {
                out.collect(value + broadcastValue);
            }
        }

        @Override
        public void processBroadcastElement(Integer value, Context ctx, Collector<Integer> out) {
            // 更新广播状态
            ctx.getBroadcastState().put("broadcastState", value);
        }
    })
    .print();
env.execute();

4. 总结
  • Executor 端Driver 端 的内存是隔离的,因此在 Executor 端 修改 Driver 端 的数据结构不会生效。
  • 这种行为不会报错,因为 Executor 端 操作的是自己的副本,而不是 Driver 端 的原始对象。
  • 为了避免这种问题,可以使用框架提供的机制(如 collect、累加器、广播变量等)来实现 Driver 端Executor 端 之间的数据传递和共享。

四、总结

Driver 端 和 Executor 端 运行在不同的 JVM 进程中,内存隔离导致 Executor 端 对 Driver 端 数据结构的修改不会生效。

数据从 Driver 端 传递到 Executor 端 是通过序列化和反序列化实现的,这是一个单向过程,修改不会同步回 Driver 端。

解决方案包括使用 collect 方法将数据收集到 Driver 端、使用累加器进行分布式累加、使用广播变量共享数据等。

这种行为不会报错,因为 Executor 端 操作的是自己的副本,而不是 Driver 端 的原始对象。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-02-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、背景
  • 二、spark 中解决方案( 其他分布式计算引擎也类似 )
    • 解决方案
    • 1. 使用 collect 方法
      • 2. 使用 aggregate 或 reduce 方法
      • 3. 使用累加器(Accumulator)
      • 4. 使用广播变量(Broadcast Variable)
    • 总结
  • 三、为什么不会生效和报错
    • 1. 为什么不会报错?
      • Spark 和 Flink 的执行模型
      • Java 对象的序列化和传递
      • 为什么不会报错?
    • 2. 为什么修改不会生效?
      • 内存隔离
      • 对象的序列化和反序列化
    • 3. 如何避免这种问题?
      • 在 Spark 中
      • 在 Flink 中
    • 4. 总结
  • 四、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档