前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复

分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复

作者头像
芋道源码
发布2019-02-27 17:38:49
6600
发布2019-02-27 17:38:49
举报
文章被收录于专栏:芋道源码1024芋道源码1024

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reconcile/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. ReconcileService

1. 概述

本文主要分享 Elastic-Job-Lite 自诊断修复

在分布式的场景下由于网络、时钟等原因,可能导致 Zookeeper 的数据与真实运行的作业产生不一致,这种不一致通过正向的校验无法完全避免。需要另外启动一个线程定时校验注册中心数据与真实作业状态的一致性,即维持 Elastic-Job 的最终一致性

涉及到主要类的类图如下( 打开大图 ):

  • 在 Elastic-Job-lite 里,调解分布式作业不一致状态服务( ReconcileService ) 实现了自诊断修复功能。

你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 Elastic-Job 点赞!传送门

2. ReconcileService

ReconcileService,调解分布式作业不一致状态服务。

ReconcileService 继承 Google Guava AbstractScheduledService 抽象类,实现 #scheduler()#runOneIteration() 方法,达到周期性校验注册中心数据与真实作业状态的一致性。

#scheduler() 方法实现如下

代码语言:javascript
复制
// ReconcileService.java
@Override
protected Scheduler scheduler() {
   return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
  • 每 1 分钟会调用一次 #runOneIteration() 方法进行校验。
  • Google Guava AbstractScheduledService 相关的知识,有兴趣的同学可以自己 Google 学习哟。

#runOneIteration() 方法实现如下

代码语言:javascript
复制
// ReconcileService.java
@Override
protected void runOneIteration() throws Exception {
   LiteJobConfiguration config = configService.load(true);
   int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
   if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) { // 校验是否达到校验周期
       // 设置最后校验时间
       lastReconcileTime = System.currentTimeMillis();
       if (leaderService.isLeaderUntilBlock() // 主作业节点才可以执行
               && !shardingService.isNeedSharding() // 当前作业不需要重新分片
               && shardingService.hasShardingInfoInOfflineServers()) { // 查询是包含有分片节点的不在线服务器
           log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
           // 设置需要重新分片的标记
           shardingService.setReshardingFlag();
       }
   }
}
  • 通过作业配置,设置修复作业服务器不一致状态服务调度间隔时间属性( LiteJobConfiguration.reconcileIntervalMinutes )。
  • 调用 ShardingService#setReshardingFlag() 方法,设置需要重新分片的标记。这个也是 ReconcileService 最本质的行为,有了这个标记后,作业会重新进行分片,达到作业节点本地分片数据与 Zookeeper 数据一致。作业分片逻辑,在《Elastic-Job-Lite 源码分析 —— 作业分片》有详细解析。
  • 调解分布式作业不一致状态服务一共有三个条件:
    • 调用 LeaderService#isLeaderUntilBlock() 方法,判断当前作业节点是否为主节点。在《Elastic-Job-Lite 源码分析 —— 主节点选举》有详细解析。
    • 调用 ShardingService#isNeedSharding() 方法,判断当前作业是否需要重分片。如果需要重新分片,就不要重复设置当前作业需要重新分片的标记。
    • 调用 ShardingService#hasShardingInfoInOfflineServers() 方法,查询是否包含有分片节点的不在线服务器。永久数据节点 /${JOB_NAME}/sharding/${ITEM_INDEX}/instance存储分配的作业节点主键( ${JOB_INSTANCE_ID} ), 不会随着作业节点因为各种原因断开后会话超时移除,而临时数据节点/${JOB_NAME}/instances/${JOB_INSTANCE_ID} 随着作业节点因为各种原因断开后超时会话超时移除。当查询到包含有分片节点的不在线的作业节点,设置需要重新分片的标记后进行重新分片,将其持有的作业分片分配给其它在线的作业节点。
代码语言:javascript
复制
// ShardingService.java
 /**
 * 查询是包含有分片节点的不在线服务器.
 * 
 * @return 是包含有分片节点的不在线服务器
 */
public boolean hasShardingInfoInOfflineServers() {
    List<String> onlineInstances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT); // `/${JOB_NAME}/instances/${JOB_INSTANCE_ID}`
    int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
    for (int i = 0; i < shardingTotalCount; i++) {
        if (!onlineInstances.contains(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) { // `/${JOB_NAME}/sharding/${ITEM_INDEX}/instance`
            return true;
        }
    }
    return false;
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-12-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. ReconcileService
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档