在任务执行的前后,ElasticJob可以提供扩展,其主要类图如下:
ElastciJobListener任务执行事件监听器提供如下两个方法:
上述回调函数是分片级的,也就是说默认情况下,同一个任务的多个分片都会执行beforeJobExecuted、afterJobExe-cuted方法,如果某些情况同一个任务只需在最后一个分片执行之前执行,最后一个分片执行完成后才执行,又该如何实现呢。AbstractDistributeOnceElastic-JobListener粉墨登场。
AbstractDistributeOnceElasticJobListener分布式作业中只执行一次的监听器。
接下来重点分析AbstractDistributeOnce-ElasticJobListener实现原理(分布式环境中,监听器只在一个节点上执行的实现逻辑)。
重点分析beforeJobExecuted方法实现原理,afterJobExecuted方法类似。
1AbstractDistributeOnceElasticJobListener#beforeJobExecuted
2public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
3 guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet()); // @1
4 if (guaranteeService.isAllStarted()) { // @2
5 doBeforeJobExecutedAtLastStarted(shardingContexts);
6 guaranteeService.clearAllStartedInfo();
7 return;
8 }
9 long before = timeService.getCurrentMillis(); // @3
10 try {
11 synchronized (startedWait) {
12 startedWait.wait(startedTimeoutMilliseconds);
13 }
14 } catch (final InterruptedException ex) {
15 Thread.interrupted();
16 }
17 if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) { // @4
18 guaranteeService.clearAllStartedInfo();
19 handleTimeout(startedTimeoutMilliseconds);
20 }
21 }
代码@1:使用GuaranteeService注册分片开始。 代码@2:判断该任务所有的分片是否都已经注册启动,如果都注册启动,则调用doBeforeJobExecutedAtLastStarted()方法。 代码@3:获取服务器当前时间。 代码@4:利用startWait.wait(startedTi-meoutMilliseconds)带超时时间的等待,这里如何唤醒呢? 代码@5:判断唤醒是超时唤醒还是正常唤醒,如果是超时唤醒,清除所有的分片注册启动信息,处理超时异常。
上述流程简单明了,下面有两个问题需要进一步探究,如何注册分片启动信息与如何被唤醒。 1、任务节点注册分配给当前节点的任务分片
1/**
2 * 根据分片项注册任务开始运行.
3 *
4 * @param shardingItems 待注册的分片项
5 */
6 public void registerStart(final Collection<Integer> shardingItems) {
7 for (int each : shardingItems) {
8 jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each));
9 }
10 }
创建持久节点:{namespace}/jobname/guarantee/started/{item}。
2、当最后一个节点注册启动执行doBe-foreJobExecutedAtLastStarted方法后,如何唤醒其他节点以便进入到任务执行阶段?
请注意:AbstractDistributeOnceElasti-cJobListener#beforeJobExecuted在执行完doBeforeJobExecuted方法后,会执行guaranteeService.clearAllStarted-Info()方法。
1if (guaranteeService.isAllStarted()) {
2 doBeforeJobExecutedAtLastStarted(shardingContexts);
3 guaranteeService.clearAllStartedInfo();
4 return;
5 }
也就是回调函数执行完毕后,会删除任务所有的分片。温馨提示:业务实现子类实现doBeforeJobExecutedAtLastSt-arted方法时最好不要抛出异常,不然各节点的唤醒操作只能是等待超时后被唤醒。
1GuaranteeService#clearAllStartedInfo
2/**
3 * 清理所有任务启动信息.
4 */
5 public void clearAllStartedInfo() {
6 jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT);
7 }
直接删除{namespace}/jobname/guara-ntee/started根节点。基于ZK的开发模式触发一次删除操作,肯定会有事件监听器来监听该节点的删除事件,从而触发其他节点的唤醒操作,果不奇然Elastci-Job提供GuaranteeListenerManager事件监听来监听{namespace}/jobname/g-uarantee/started节点的删除事件。
1GuaranteeListenerManager$StartedNodeRemovedJobListener
2class StartedNodeRemovedJobListener extends AbstractJobListener {
3 @Override
4 protected void dataChanged(final String path, final Type eventType, final String data) {
5 if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) {
6 for (ElasticJobListener each : elasticJobListeners) {
7 if (each instanceof AbstractDistributeOnceElasticJobListener) {
8 ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
9 }
10 }
11 }
12 }
13 }
每个Job实例在监听到{namespace}/job-name/guarantee/started节点被删除后,会执行AbstractDistributeOnceElasticJo-bListener的notifyWaitingTaskStart方法唤醒被阻塞的线程,使线程进入到任务执行阶段。
同理,任务执行后监听方法afterJobEx-ecuted的执行流程实现原理一样,在这里就不在重复讲解了。