前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkListener监听机制使用及自定义事件处理

SparkListener监听机制使用及自定义事件处理

作者头像
大数据技术架构
发布2021-07-05 19:01:20
1.6K0
发布2021-07-05 19:01:20
举报

概述

Spark 提供了一系列整个任务生命周期中各个阶段变化的事件监听机制,通过这一机制可以在任务的各个阶段做一些自定义的各种动作。SparkListener便是这些阶段的事件监听接口类 通过实现这个类中的各种方法便可实现自定义的事件处理动作。

自定义示例代码

代码语言:javascript
复制
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerApplicationEnd, SparkListener}

/**
 * Created by silent on 2019/1/11.
 */
class MySparkAppListener extends SparkListener with Logging {

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    val appId = applicationStart.appId
    logInfo("***************************************************" + appId.get)
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    logInfo("************************ app end time ************************ " + applicationEnd.time)
  }
}

主函数运行示例

代码语言:javascript
复制
object Main extends App {
     val spark = SparkSession.builder()
                 .appName("main")
                 .master("local[2]")
                 .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener") 
                 .getOrCreate()

     //spark.sparkContext.addSparkListener(new MySparkAppListener) 
     spark.stop()
}

说明:自定义监听sparListener后的注册方式有两种:

方法1:conf配置中指定

代码语言:javascript
复制
//spark2.0以下
val sparkConf=new SparkConf()
sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener")

// spark2.0+
val spark = SparkSession.builder()
                 .appName("main")
                 .master("local[2]")
                 .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener")
                 .getOrCreate()

方法2:sparkContext 类中指定

代码语言:javascript
复制
//spark2.0前
val sc = new SparkContext(sparkConf)
sc.addSparkListener(new MySparkAppListener)

//spark2.0+
spark.sparkContext.addSparkListener(new MySparkAppListener)

SparkListerner 代码记录

代码语言:javascript
复制
//SparkListener 下各个事件对应的函数名非常直白,即如字面所表达意思。
//想对哪个阶段的事件做一些自定义的动作,变继承SparkListener实现对应的函数即可

abstract class SparkListener extends SparkListenerInterface {
  //阶段完成时触发的事件
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }

  //阶段提交时触发的事件
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }

  //任务启动时触发的事件
  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }

  //下载任务结果的事件
  override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }

  //任务结束的事件
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }

  //job启动的事件
  override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }

  //job结束的事件
  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }

  //环境变量被更新的事件
  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }

  //块管理被添加的事件
  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }

  override def onBlockManagerRemoved(
      blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }

  //取消rdd缓存的事件
  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }

  //app启动的事件
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }

  //app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { } 

  override def onExecutorMetricsUpdate(
      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }

  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

  override def onExecutorBlacklisted(
      executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }

  override def onExecutorUnblacklisted(
      executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

  override def onNodeBlacklisted(
      nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }

  override def onNodeUnblacklisted(
      nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

  override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}

本文转载自:https://www.cnblogs.com/yyy-blog/p/10253830.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自定义示例代码
  • 主函数运行示例
    • SparkListerner 代码记录
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档