spark源码系列之内部通讯的三种机制

本文是以spark1.6.0的源码为例讲解。

Spark为协调各个组件完成任务及内部任务处理采用了多种方式进行了各个组件之间的通讯。总共三个部分牵涉的功能是:

1,DAG相关的DAGSchedulerEventProcessLoop。

2,sparkUI相关的SparkListener

3,RPC相关netty RPC流程。本文只讲流程,后面会详细介绍。

一,单个部件自己消息处理方式

DAGSchedulerEventProcessLoop该类继承自EventLoop。是一个典型的生产消费模型。

A),生产者

通过调用

DAGSchedulerEventProcessLoop.post(event: E)

来将消息进行发布。

B),消费者

Eventloop内部维护了一个线程,循环的消费消息eventQueue.take(),调用onReceive(event)进行处理。DAGSchedulerEventProcessLoop内部实现了doOnReceive,对事件进行模式匹配然后交给具体的消息处理函数。

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

 case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

 case StageCancelled(stageId) =>
    dagScheduler.handleStageCancellation(stageId)

 case JobCancelled(jobId) =>
    dagScheduler.handleJobCancellation(jobId)

 case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)

 case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()

 case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)

 case ExecutorLost(execId) =>
    dagScheduler.handleExecutorLost(execId, fetchFailed = false)

 case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)

 case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)

 case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
    dagScheduler.handleTaskCompletion(completion)

 case TaskSetFailed(taskSet, reason, exception) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

 case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
}

C),消息缓存

消息最终是存储于EventLoop的new LinkedBlockingDeque[E]()里。

二,SparkListeners和ListenerBus

SparkUI的各个监控指标都是,由ListenerBus最为生产者将消息,推送到消息缓存出默认支持1万,然后推送给各个Listener进行处理,然后我们的Spark的webUIPage去获取各个Listener的数据,进行展示。

A),生产者

LiveListenerBus/StreamingListenerBus调用其父类AsynchronousListenerBus的post方法将消息加入 new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY),容量1万。

val eventAdded = eventQueue.offer(event)

B),消费者

AsynchronousListenerBus内部维护了一个消费者线程,线程内部有while(true)进行消息处理。

val event = eventQueue.poll

postToAll(event)

C),消息的具体处理

ListenerBus的postToAll方法,会遍历所有注册了的Listener。

final def postToAll(event: E): Unit = {

 val iter = listeners.iterator
 while (iter.hasNext) {
 val listener = iter.next()
 try {
 onPostEvent(listener, event)
 } catch {
 case NonFatal(e) =>
        logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
    }
  }
}

最终在onPostEvent方法中将消息进行了处理。onPostEvent在源码中的两个重要现:

SparkListenerBus和StreamingListenerBus内部的onPostEvent。

private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {

 override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
    event match {
 case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
 case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
 case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
 case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
 case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
 case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
 case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
 case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
 case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
 case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
 case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
 case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
 case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
 case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
 case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
 case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
 case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
 case logStart: SparkListenerLogStart => // ignore event log metadata
 }
  }

}
private[spark] class StreamingListenerBus
 extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
 with Logging {

 private val logDroppedEvent = new AtomicBoolean(false)

 override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
    event match {
 case receiverStarted: StreamingListenerReceiverStarted =>
        listener.onReceiverStarted(receiverStarted)
 case receiverError: StreamingListenerReceiverError =>
        listener.onReceiverError(receiverError)
 case receiverStopped: StreamingListenerReceiverStopped =>
        listener.onReceiverStopped(receiverStopped)
 case batchSubmitted: StreamingListenerBatchSubmitted =>
        listener.onBatchSubmitted(batchSubmitted)
 case batchStarted: StreamingListenerBatchStarted =>
        listener.onBatchStarted(batchStarted)
 case batchCompleted: StreamingListenerBatchCompleted =>
        listener.onBatchCompleted(batchCompleted)
 case outputOperationStarted: StreamingListenerOutputOperationStarted =>
        listener.onOutputOperationStarted(outputOperationStarted)
 case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
        listener.onOutputOperationCompleted(outputOperationCompleted)
 case _ =>
    }
  }

 override def onDropEvent(event: StreamingListenerEvent): Unit = {
 if (logDroppedEvent.compareAndSet(false, true)) {
 // Only log the following message once to avoid duplicated annoying logs.
 logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
 "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
 "rate at which events are being started by the scheduler.")
    }
  }
}

D),消息的缓存

消息是缓存在AsynchronousListenerBus

val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)

EVENT_QUEUE_CAPACITY=10000

三,Spark多进程之间的通讯RPC

Spark的内部rpc老版本是用akka实现的,spark1.6以后虽然保留akka,但是默认实现已经是netty。

其实,rpc采用netty之前,rpc是通过akka,而文件传输是通过netty。现在相当于全部采用了netty的实现的。

四,总结

本篇文章主要是将内部spark的内部事件通知的机制。希望通过这篇文章,大家对spark内部事件通知流程有所了解。

这三种模型是我们现在编程最常见的三种模型,希望能对大家编写自己的代码提供一些有益的思路。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-07-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java 源码分析

Netty 入门

1. 粘包问题 一 .长连接与短连接: 1.长连接:Client方与Server方先建立通讯连接,连接建立后不断开, 然后再进行报文发送和接收。长连接在 net...

3005
来自专栏小曾

.Net Web开发技术栈

有很多朋友有的因为兴趣,有的因为生计而走向了.Net中,有很多朋友想学,但是又不知道怎么学,学什么,怎么系统的学,为此我以我微薄之力总结归纳写了一篇.Net w...

3413
来自专栏FreeBuf

用搜索神器Everything定位Webshell木马后门

Everything是速度最快的文件名搜索软件。其速度之快令人震惊,百G硬盘几十万个文件,可以在几秒钟之内完成索引;文件名搜索瞬间呈现结果。它小巧免费,支持中文...

2808
来自专栏along的开发之旅

Android逆向分析概述

学习逆向的初衷是想系统学习Android下的hook技术和工具, 想系统学习Android的hook技术和工具是因为Android移动性能实战这本书. 这本书里...

7584
来自专栏xingoo, 一个梦想做发明家的程序员

Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)

今天抽空回顾了一下Spark相关的源码,本来想要了解一下Block的管理机制,但是看着看着就回到了SparkContext的创建与使用。正好之前没有正式的整理...

2129
来自专栏葡萄城控件技术团队

Winform文件下载之WebClient

最近升级了公司内部使用的一个下载小工具,主要提升了下面几点: 1. 在一些分公司的局域网中,连接不上外网 2. 服务器上的文件更新后,下载到的还是更新前的文件 ...

2065
来自专栏Crossin的编程教室

Python爬虫:一些常用的爬虫技巧总结

转自:开源中国 http://my.oschina.net/jhao104/blog/647308 用python也差不多一年多了,python应用最多的场景...

3034
来自专栏xingoo, 一个梦想做发明家的程序员

Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)

今天抽空回顾了一下Spark相关的源码,本来想要了解一下Block的管理机制,但是看着看着就回到了SparkContext的创建与使用。正好之前没有正式的整理...

2117
来自专栏逸鹏说道

bootstrap + requireJS+ director+ knockout + web API = 一个时髦的单页程序

bootstrap + requireJS+ director+ knockout + web API = 一个时髦的单页程序 也许单页程序(Single Pa...

3505
来自专栏鸿的学习笔记

DBDB: 一个简单的key/value数据库(一)

导论 DBDB(Dog Bed Database)是基于Python实现的key/value数据库。 它将key值与value值关联,并将该关联存储在磁盘上方便...

1123

扫码关注云+社区

领取腾讯云代金券