分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业监听器

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-listener/ 「芋道源码」欢迎转载,保留摘要,谢谢!

1. 概述

本文主要分享 Elastic-Job-Lite 作业监听器

涉及到主要类的类图如下( 打开大图 ):

  • 绿色监听器接口 ElasticJobListener,每台作业节点均执行。
  • 粉色监听器接口 AbstractDistributeOnceElasticJobListener,分布式场景中仅单一节点执行。
  • 蓝色类在 com.dangdang.ddframe.job.lite.internal.guarantee 里,保证分布式任务全部开始和结束状态。 AbstractDistributeOnceElasticJobListener 通过 guarantee 功能,实现分布式场景中仅单一节点执行。

你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 Elastic-Job 点赞!传送门

2. ElasticJobListener

ElasticJobListener,作业监听器接口,每台作业节点均执行

若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器。

接口代码如下:

public interface ElasticJobListener {

    /**
     * 作业执行前的执行的方法.
     * 
     * @param shardingContexts 分片上下文
     */
    void beforeJobExecuted(final ShardingContexts shardingContexts);

    /**
     * 作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    void afterJobExecuted(final ShardingContexts shardingContexts);
}

调用执行如下:

// AbstractElasticJobExecutor.java
public final void execute() {
   // ...省略无关代码

   // 执行 作业执行前的方法
   try {
       jobFacade.beforeJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
   // ...省略无关代码(执行 普通触发的作业)
   // ...省略无关代码(执行 被跳过触发的作业)
   // ...省略无关代码(执行 作业失效转移)

   // ...执行 作业执行后的方法
   try {
       jobFacade.afterJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
}
  • JobFacade 对作业监听器简单封装进行调用。 // LiteJobFacade.java @Override public void beforeJobExecuted(final ShardingContexts shardingContexts) { for (ElasticJobListener each : elasticJobListeners) { each.beforeJobExecuted(shardingContexts); } } @Override public void afterJobExecuted(final ShardingContexts shardingContexts) { for (ElasticJobListener each : elasticJobListeners) { each.afterJobExecuted(shardingContexts); } }
  • 下文提到的 AbstractDistributeOnceElasticJobListener,也是这么调用。

3. AbstractDistributeOnceElasticJobListener

AbstractDistributeOnceElasticJobListener,在分布式作业中只执行一次的监听器。

若作业处理数据库数据,处理完成后只需一个节点完成数据清理任务即可。此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用。

创建 AbstractDistributeOnceElasticJobListener 代码如下:

public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {

    /**
     * 开始超时时间
     */
    private final long startedTimeoutMilliseconds;
    /**
     * 开始等待对象
     */
    private final Object startedWait = new Object();
    /**
     * 完成超时时间
     */
    private final long completedTimeoutMilliseconds;
    /**
     * 完成等待对象
     */
    private final Object completedWait = new Object();
    /**
     * 保证分布式任务全部开始和结束状态的服务
     */
    @Setter
    private GuaranteeService guaranteeService;

    private TimeService timeService = new TimeService();

    public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
        if (startedTimeoutMilliseconds <= 0L) {
            this.startedTimeoutMilliseconds = Long.MAX_VALUE;
        } else {
            this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;
        }
        if (completedTimeoutMilliseconds <= 0L) {
            this.completedTimeoutMilliseconds = Long.MAX_VALUE; 
        } else {
            this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;
        }
    }
}
  • 超时参数 startedTimeoutMillisecondscompletedTimeoutMilliseconds 务必传递,避免作业不同步导致的死锁。

?下面,我们来看本文的重点:AbstractDistributeOnceElasticJobListener,在分布式作业中只执行一次:

@Override
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
   // 注册作业分片项开始运行
   guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
   // 判断是否所有的分片项开始运行
   if (guaranteeService.isAllStarted()) {
       // 执行
       doBeforeJobExecutedAtLastStarted(shardingContexts);
       // 清理启动信息
       guaranteeService.clearAllStartedInfo();
       return;
   }
   // 等待
   long before = timeService.getCurrentMillis();
   try {
       synchronized (startedWait) {
           startedWait.wait(startedTimeoutMilliseconds);
       }
   } catch (final InterruptedException ex) {
       Thread.interrupted();
   }
   // 等待超时
   if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
       // 清理启动信息
       guaranteeService.clearAllStartedInfo();
       handleTimeout(startedTimeoutMilliseconds);
   }
}
  • 调用 GuaranteeService#registerStart(...) 方法,注册作业分片项开始运行。 // GuaranteeService.java public void registerStart(final Collection<Integer> shardingItems) { for (int each : shardingItems) { jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each)); } } // GuaranteeNode.java public final class GuaranteeNode { static final String ROOT = "guarantee"; static final String STARTED_ROOT = ROOT + "/started"; } static String getStartedNode(final int shardingItem) { return Joiner.on("/").join(STARTED_ROOT, shardingItem); }
    • Zookeeper 数据节点 /${JOB_NAME}/guarantee/started/${ITEM_INDEX}永久节点,存储空串( "" )。为什么是永久节点呢?在 GuaranteeService#isAllStarted() 见分晓。
  • 调用 GuaranteeService#isAllStarted() 方法,判断是否所有的分片项开始运行。 /** * 判断是否所有的任务均启动完毕. * * @return 是否所有的任务均启动完毕 */ public boolean isAllStarted() { return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT) && configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size(); }
    • /${JOB_NAME}/guarantee/started/ 目录下,所有作业分片项都开始运行,即运行总数等于作业分片总数( JobCoreConfiguration.ShardingTotalCount ),代表所有的任务均启动完毕
    • 等待所有任务启动过程中,不排除有作业节点会挂掉,如果 /${JOB_NAME}/guarantee/started/${ITEM_INDEX} 存储临时节点,会导致不能满足所有的分片项开始运行的条件。
    • 等待过程中,如果调整作业分片总数( JobCoreConfiguration.ShardingTotalCount ),会导致异常。
  • 当不满足所有的分片项开始运行时,作业节点调用 Object#wait(...) 方法进行等待。该等待怎么结束等待?当满足所有的分片项开始运行的作业节点调用 GuaranteeService#clearAllStartedInfo() 时,StartedNodeRemovedJobListener 会监听到 /${JOB_NAME}/guarantee/started/ 被删除,调用 Object#notifyAll(...) 方法进行唤醒全部。 // GuaranteeService.java /** * 清理所有任务启动信息. */ public void clearAllStartedInfo() { jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT); } // StartedNodeRemovedJobListener.java class StartedNodeRemovedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(); } } } } }
  • 调用 #doBeforeJobExecutedAtLastStarted(...) 方法,执行最后一个作业执行前的执行的方法,实现该抽象方法,完成自定义逻辑。#doAfterJobExecutedAtLastCompleted(...) 实现的方式一样,就不重复解析了。 // AbstractDistributeOnceElasticJobListener.java /** * 分布式环境中最后一个作业执行前的执行的方法. * * @param shardingContexts 分片上下文 */ public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts); /** * 分布式环境中最后一个作业执行后的执行的方法. * * @param shardingContexts 分片上下文 */ public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts); 整体流程如下图:

原文发布于微信公众号 - 芋道源码(YunaiV)

原文发表时间:2018-11-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏IMWeb前端团队

上手 yeoman generator

最近折腾脚手架相关的一些事情。说到脚手架,不得不谈的就是yeoman了。 是什么 yeoman是一个脚手架生成工具。 yeoman generator则是yeo...

2325
来自专栏PPV课数据科学社区

PyCharm 2016.3 公开预览版发布

PyCharm 2016.3 公开预览版发布了,PyCharm是一种Python IDE,带有一整套可以帮助用户在使用Python语言开发时提高其效率的工具,比...

2644
来自专栏hbbliyong

在C#使用文件监控对象FileSystemWatcher 实现数据同步

       最近在项目中有这么个需求,就是得去实时获取某个在无规律改变的文本文件中的内容。首先想到的是用程序定期去访问这个文件,因为对实时性要求很高,间隔不能...

3946
来自专栏c#开发者

Msmq设计文档(赋源代码)

Msmq设计文档 文件状态: [√] 草稿 [ ] 正式发布 [ ] 正在修改 文件标识: ECI-MSMQ v01 当前版本: 0.5...

3488
来自专栏炉边夜话

JNI使用技巧点滴(二)

作者:normalnotebook 背景<?xml:namespace prefix = o ns = "urn:schemas-microsoft-com...

1742
来自专栏运维

Unix & Linux 大学教程 学习总结

两年前我看这本书时,是一本812页的厚书,现在我总结成了40句话,什么时候成了1句话就好了。

1371
来自专栏JavaEdge

操作系统之文件管理

将文件属性从外存拷到内存中打开文件表的一表目中 将其编号返回给用户。 系统可利用该编号到打开文件表中去查找。

35610
来自专栏源哥的专栏

给你的系统增加对物理地址的验证

我们开发出一个系统之后,经常有很多方法来保护我们的系统不受别人非法使用,比如说采用注册码,根据IP地址进行限制等。这些都存在一个问题就是容易给人通过拷贝注册码等...

912
来自专栏青玉伏案

iOS开发之再探多线程编程:Grand Central Dispatch详解

Swift3.0相关代码已在github上更新。之前关于iOS开发多线程的内容发布过一篇博客,其中介绍了NSThread、操作队列以及GCD,介绍的不够深入。今...

1967
来自专栏蘑菇先生的技术笔记

WebApiThrottle限流框架使用手册

3675

扫码关注云+社区

领取腾讯云代金券