前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >解惑| spark实现业务前一定要掌握的点~

解惑| spark实现业务前一定要掌握的点~

作者头像
Spark学习技巧
发布2019-11-28 23:34:33
1.1K0
发布2019-11-28 23:34:33
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

一直都有粉丝留言,问各种奇怪的问题,今天就列举一个浪尖反复解答过的问题:编写的spark 代码到底是执行在driver端还是executor端?

1.driver & executor

浪尖这里只提本文关注的两个角色,driver和executor。

首先,driver是用户提交应用程序的入口main函数执行的地方,driver主要作用就是进行job的调度,DAG构建及调度,然后调度task。

然后,executor是执行task地方,然后将结果、状态等汇集到driver,当然executor上执行的task的结果也可以是shuffle中间结果,也可以落地到外部存储。

executor之间目前不能通信,只能借助第三方来共享数据或者通信。

2.单机 vs 多线程 vs 多进程

用户编写的代码都在main函数里,按照单机版的理解,代码肯定是执行于同一台机器,同一个JVM中的。当然,调用类似processbuilder启动的进程除外。这样就可以共享一些变量,一些链接,如数据库连接。

拿段代码举例:

代码语言:javascript
复制
public static void main(String[] args){
 Map<String,int> hashmap = new HashMap();
 rdd.map(each=>{
   if(hashmap.containsKey(each)){
      hashmap.put(each,hashmap.get(each)+1);
   }else{
      hashmap.put(each,1);
   }
 })
 System.out.println(hashmap.size())
 }

假如map是就是一个算子,传入的是一个函数,单线程的,主要是想做一个wordcount功能,那么最后输出hashmap的大小肯定不为0.

假如map算子,是多线程线程执行,几个CPU启动几个线程执行,那么hashmap也是不为0,因为是在同一个jvm中,hashmap属于共享堆对象,当然暂不考虑并发问题。这就类似 jdk新特性,并行流处理。

假如rdd就是spark里的rdd,那么map算子传入的函数会封装成一个闭包,然后在driver构建完DAG,划分好stage和task,后driver会调度task到executor端去执行。这个时候,map函数外部的hashmap和内部的hashmap就没有关系了,输出的size是0 。

hashmap这种堆对象,数据库连接,kafka生产消费者等都是这样,不能在rdd的map算子外部声明,内部使用,因为代码都在不同的进程甚至机器中执行,这些对象都不支持跨进程共享,更别提跨机器了。

所有rdd的算子都是如此,所有Dataframe/dataset算子也是如此。

有人该抬杠可,我在idea执行的分明不是0,浪尖,你这解释是错的哦。

那是因为你local模式,进程在同一个jvm中,所以就类似模式二的多线程,当然local多核的话也会出现并发问题。

那要driver和executor 通信怎么办?

第三方存储,广播变量,累加器,executor返回值。

重要|Spark driver端得到executor返回值的方法

3. foreach vs foreachpartition vs foeachrdd

其实,在这里浪尖可以先稍微总结一下:

所有对RDD具体数据的操作都是在executor上执行的,所有对rdd自身的操作都是在driver上执行的。

因为driver端是job,stage,task等生成调度的地方,executor是task执行的地方。job,stage,task生成都离不开rdd自身,而task执行离不开具体的数据。rdd的相关的操作不能缺少driver端的sparkcontext/sparksession。

foreach/foreachPartition都是针对rdd内部数据进行处理的,所以我们传递给这些算子的函数都是执行于executor端的。

Spark源码系列之foreach和foreachPartition的区别

foreachrdd很明显是对rdd进行操作的,所以他的参数函数是在driver端执行的,而foreachrdd的参数函数内部的rdd数据处理,会进一步调度执行于executor端。所以,foreachrdd内部可以使用外部的变量,链接等。当然,foreachrdd的内部rdd的具体算子是不能的。类似的还有transform等。

代码语言:javascript
复制
 public static void main(String[] args){
 Map<String,int> hashmap = new HashMap();
 dstream.foreachrdd(rdd=>{
   String today = getToday();
   if(hashmap.containsKey(today )){
      hashmap.put(today ,hashmap.get(today )+1);
   }else{
      hashmap.put(today,1);
   }
 })
 System.out.println(hashmap.size())
 }

输出的hashmap的size就不为0,因为这段代码是执行于driver的。

代码语言:javascript
复制
 public static void main(String[] args){
 Map<String,int> hashmap = new HashMap();
 dstream.foreachrdd(rdd=>{
   rdd.map(each=>{
     if(hashmap.containsKey(each)){
        hashmap.put(each,hashmap.get(each)+1);
     }else{
        hashmap.put(each,1);
     }
   })
 })
 System.out.println(hashmap.size())
 

这个时候输出的hashmap的size就为0 ,因为虽然是在foreachrdd内部,但是却是对rdd具体数据的操作,所以是执行于executor端的。

4. 总结

切记:所有对RDD内部具体数据的操作执行都是在executor上进行的,所有对rdd自身的操作都是在driver上执行的。

掌握这些,才能更好理解spark,才能写出好的spark代码,才能做对业务。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档