Elastic-Job-Lite 源码分析——作业事件追踪

1. 概述

本文主要分享 Elastic-Job-Lite 作业事件追踪。

另外,Elastic-Job-Cloud 作业事件追踪 和 Elastic-Job-Lite 基本类似,不单独开一篇文章,记录在该文章里。如果你对 Elastic-Job-Cloud 暂时不感兴趣,可以跳过相应部分。

Elastic-Job 提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job 目前订阅两种事件,基于关系型数据库记录事件。

涉及到主要类的类图如下( 打开大图 ):

作业事件:粉色的类。

作业事件总线:黄色的类。

作业事件监听器:蓝色的类。

2. 作业事件总线

JobEventBus,作业事件总线,提供了注册监听器、发布事件两个方法。

创建 JobEventBus 代码如下:

JobEventBus 基于Google Guava EventBus,在《Sharding-JDBC 源码分析 —— SQL 执行》「4.1 EventBus」有详细分享。这里要注意的是 AsyncEventBus( 异步事件总线 ),注册在其上面的监听器是异步监听执行,事件发布无需阻塞等待监听器执行完逻辑,所以对性能不存在影响。

使用 JobEventConfiguration( 作业事件配置 ) 创建事件监听器,调用 #register() 方法进行注册监听。

该方法是私有(private )方法,只能使用 JobEventConfiguration 创建事件监听器注册。当不传递该配置时,意味着不开启事件追踪功能。

发布作业事件

发布作业事件( JobEvent ) 代码如下:

在 Elaistc-Job-Lite 里,LiteJobFacade 对JobEventBus#post(...) 进行封装,提供给作业执行器( AbstractElasticJobExecutor )调用( Elastic-Job-Cloud 实际也进行了封装 ):

TaskContext 通过#from(...) 方法,对作业任务ID( taskId ) 解析,获取任务上下文。TaskContext 代码注释很完整,点击链接直接查看。

3. 作业事件

目前有两种作业事件( JobEvent ):

JobStatusTraceEvent,作业状态追踪事件。

JobExecutionEvent,作业执行追踪事件。

本小节分享两方面:

作业事件发布时机。

Elastic-Job 基于关系型数据库记录事件的表结构。

3.1 作业状态追踪事件

JobStatusTraceEvent,作业状态追踪事件。

代码如下:

ExecutionType,执行类型。

Source,任务来源。

State,任务执行状态。

Elastic-Job-Lite 使用 TASK_STAGING、TASK_RUNNING、TASK_FINISHED、TASK_ERROR 四种执行状态。

Elastic-Job-Cloud 使用所有执行状态。

关系数据库表 JOB_STATUS_TRACE_LOG 结构如下:

Elastic-Job-Lite 一次作业执行记录如下( 打开大图 ):

JobStatusTraceEvent 在 Elastic-Job-Lite 发布时机:

State.TASK_STAGING:

State.TASK_RUNNING:

State.TASK_FINISHED、State.TASK_ERROR【第一种】:

State.TASK_FINISHED、State.TASK_ERROR【第二种】:

JobStatusTraceEvent 在 Elastic-Job-Cloud 发布时机:

Elastic-Job-Cloud 除了上文 Elastic-Job-Lite 会多一个场景下记录作业状态追踪事件( State.TASK_STAGING ),实现代码如下:

任务提交调度服务( TaskLaunchScheduledService )提交任务时,记录发布作业状态追踪事件(State.TASK_STAGING)。

Elastic-Job-Cloud 根据 Mesos Master 通知任务状态变更,记录多种作业状态追踪事件,实现代码如下:

3.2 作业执行追踪事件

JobExecutionEvent,作业执行追踪事件。

代码如下:

ExecutionSource,执行来源

关系数据库表JOB_EXECUTION_LOG 结构如下:

Elastic-Job-Lite 一次作业多作业分片项执行记录如下( 打开大图 ):

JobExecutionEvent 在 Elastic-Job-Lite 发布时机:

JobExecutionEvent 在 Elastic-Job-Cloud 发布时机:

和 Elastic-Job-Cloud 一致。

3.3 作业事件数据库存储

JobEventRdbStorage,作业事件数据库存储。

创建 JobEventRdbStorage 代码如下:

调用#createJobExecutionTableAndIndexIfNeeded(...) 创建 JOB_EXECUTION_LOG 表和索引。

调用 #createJobStatusTraceTableAndIndexIfNeeded(...) 创建 JOB_STATUS_TRACE_LOG 表和索引。

存储 JobStatusTraceEvent 代码如下:

originalTaskId,原任务作业ID。

Elastic-Job-Lite 暂未使用到该字段,存储空串( "" )。

Elastic-Job-Cloud 在作业失效转移场景下使用该字段,存储失效转移的任务作业ID。

存储 JobExecutionEvent 代码如下

作业分片项执行完成进行的是更新操作。

3.4 作业事件数据库查询

JobEventRdbSearch,作业事件数据库查询,提供给运维平台调用查询数据。感兴趣的同学点击链接直接查看。

4. 作业监听器

在上文我们看到,作业监听器通过传递作业事件配置( JobEventConfiguration )给作业事件总线( JobEventBus ) 进行创建监听器,并注册监听器到事件总线。

我们来看下 Elastic-Job 提供的基于关系数据库的事件配置实现。

JobEventRdbConfiguration,作业数据库事件配置。调用 #createJobEventListener()

创建作业事件数据库监听器( JobEventRdbListener )。

JobEventRdbListener,作业事件数据库监听器。实现代码如下:

通过 JobEventRdbStorage 存储作业事件到关系型数据库。

如何自定义作业监听器?

有些同学可能希望使用 ES 或者其他数据库存储作业事件,这个时候可以通过实现 JobEventConfiguration、JobEventListener 进行拓展。

Elastic-Job-Cloud JobEventConfiguration 怎么配置?

Elastic-Job-Cloud-Scheduler:从conf/elastic-job-cloud-scheduler.properties 配置文件读取如下属性,生成 JobEventConfiguration 配置对象。

event_trace_rdb_driver

event_trace_rdb_url

event_trace_rdb_username

event_trace_rdb_password

Elastic-Job-Cloud-Executor:通过接收到任务执行信息里读取JobEventConfiguration,实现代码如下:

666. 彩蛋

旁白君:瞎比比了这么长,能不能简单粗暴一点。

芋道君:是是是。

文章来源:掘金

作者:芋道源码

data:text/html;charset=UTF-8;base64,

5p625p6E5biI5a2m5Lmg5Lqk5rWB576k5Y+35pivNTc1NzUxODU0Cg==

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180126A0ATV500?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码关注腾讯云开发者

领取腾讯云代金券