专栏首页芋道源码1024分布式作业 Elastic-Job-Lite 源码分析 —— 注册中心

分布式作业 Elastic-Job-Lite 源码分析 —— 注册中心


摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. 基于 Zookeeper 注册中心
  • 3. 作业节点数据访问类
  • 666. 彩蛋

1. 概述

本文主要分享 Elastic-Job-Lite 注册中心

涉及到主要类的类图如下( 打开大图 ):

  • 黄色的类在 elastic-job-common-core 项目里,为 Elastic-Job-Lite、Elastic-Job-Cloud 公用注册中心类。
  • 作业节点数据访问类( JobNodeStorage )的在主节点执行操作在事务中执行操作两个方法和注册中心协调分布式服务有关系,从《Elastic-Job-Lite 源码解析 —— 作业数据存储》摘出来,放本文解析。

你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 Elastic-Job 点赞!传送门

2. 基于 Zookeeper 注册中心

ZookeeperRegistryCenter,基于 Zookeeper 注册中心。从上面的类图可以看到,ZookeeperRegistryCenter 实现 CoordinatorRegistryCenter 接口,CoordinatorRegistryCenter 继承 RegistryCenter 接口。

  • RegistryCenter,注册中心,定义了简单的增删改查注册数据和查询时间的接口方法。
  • CoordinatorRegistryCenter,用于协调分布式服务的注册中心,定义了持久节点、临时节点、持久顺序节点、临时顺序节点等目录服务接口方法,隐性的要求提供事务分布式锁数据订阅等特性。

ZookeeperRegistryCenter 使用 Apache Curator 进行 Zookeeper 注册中心。

2.1 初始化

ZookeeperConfiguration,基于 Zookeeper 的注册中心配置,注释完整,点击链接直接查看。

@Override
public void init() {
   log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
   CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
           .connectString(zkConfig.getServerLists())
           .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
           .namespace(zkConfig.getNamespace()); // 命名空间
   if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
       builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); // 会话超时时间,默认 60 * 1000 毫秒
   }
   if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
       builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); // 连接超时时间,默认 15 * 1000 毫秒
   }
   // 认证
   if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
       builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
               .aclProvider(new ACLProvider() {

                   @Override
                   public List<ACL> getDefaultAcl() {
                       return ZooDefs.Ids.CREATOR_ALL_ACL;
                   }

                   @Override
                   public List<ACL> getAclForPath(final String path) {
                       return ZooDefs.Ids.CREATOR_ALL_ACL;
                   }
               });
   }
   client = builder.build();
   client.start();
   // 连接 Zookeeper
   try {
       if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
           client.close();
           throw new KeeperException.OperationTimeoutException();
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
  • ExponentialBackoffRetry,当 Zookeeper 失去链接后重新连接的一种策略:动态计算每次计算重连的间隔,时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 &lt;&lt; (retryCount + 1)))。如果对其它重连策略感兴趣,可以看 RetryPolicy 的实现类,本文就不展开了。
  • 相同的作业集群使用相同的 Zookeeper 命名空间( ZookeeperConfiguration.namespace )。

2.2 缓存

通过 Curator TreeCache 实现监控整个树( Zookeeper目录 )的数据订阅和缓存,包括节点的状态,子节点的状态。

初始化作业缓存

作业初始化注册时,初始化缓存。

// JobRegistry.java
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
   schedulerMap.put(jobName, jobScheduleController);
   regCenterMap.put(jobName, regCenter);
   // 添加注册中心缓存
   regCenter.addCacheData("/" + jobName);
}

// ZookeeperRegistryCenter.java
/**
* 缓存
* key:/作业名/
*/
private final Map<String, TreeCache> caches = new HashMap<>();

作业服务订阅数据

每个不同的服务,都会订阅数据实现功能逻辑。在后续不同服务的文章,我们会详细解析。?

public void addDataListener(final TreeCacheListener listener) {
   TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
   cache.getListenable().addListener(listener);
}

关闭作业缓存

@Override
public void evictCacheData(final String cachePath) {
   TreeCache cache = caches.remove(cachePath + "/");
   if (null != cache) {
       cache.close();
   }
}

对 Curator TreeCache 感兴趣的同学,可以点击链接继续了解。

2.3 关闭

public void close() {
   for (Entry<String, TreeCache> each : caches.entrySet()) {
       each.getValue().close();
   }
   waitForCacheClose();
   CloseableUtils.closeQuietly(client);
}

/* 
* 因为异步处理, 可能会导致client先关闭而cache还未关闭结束.
* 等待Curator新版本解决这个bug.
* BUG地址:https://issues.apache.org/jira/browse/CURATOR-157
*/
private void waitForCacheClose() {
   try {
       Thread.sleep(500L); // 等待500ms, cache先关闭再关闭client, 否则会抛异常
   } catch (final InterruptedException ex) {
       Thread.currentThread().interrupt();
   }
}

2.4 获得数据

@Override
public String get(final String key) {
   TreeCache cache = findTreeCache(key); // 获取缓存
   if (null == cache) {
       return getDirectly(key);
   }
   ChildData resultInCache = cache.getCurrentData(key); // 缓存中获取 value
   if (null != resultInCache) {
       return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
   }
   return getDirectly(key);
}

@Override
public String getDirectly(final String key) {
   try {
       return new String(client.getData().forPath(key), Charsets.UTF_8);
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
       return null;
   }
}
  • #get(…) 先从 TreeCache缓存 获取,后从 Zookeeper 获取。
  • #getDirectly(…) 直接从 Zookeeper 获取。
  • #findTreeCache(...) 代码如下: private TreeCache findTreeCache(final String key) { for (Entry<String, TreeCache> entry : caches.entrySet()) { if (key.startsWith(entry.getKey())) { return entry.getValue(); } } return null; }

2.5 获得注册子节点

获取子节点名称集合(降序)

@Override
public List<String> getChildrenKeys(final String key) {
   try {
       List<String> result = client.getChildren().forPath(key);
       Collections.sort(result, new Comparator<String>() {

           @Override
           public int compare(final String o1, final String o2) {
               return o2.compareTo(o1);
           }
       });
       return result;
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
       return Collections.emptyList();
   }
}

获取子节点数量

@Override
public int getNumChildren(final String key) {
   try {
       Stat stat = client.checkExists().forPath(key);
       if (null != stat) {
           return stat.getNumChildren();
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   return 0;
}

2.6 存储注册数据

@Override
public void persist(final String key, final String value) {
   try {
       if (!isExisted(key)) {
           client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
       } else {
           update(key, value);
       }
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}

@Override
public void persistEphemeral(final String key, final String value) {
   try {
       if (isExisted(key)) {
           client.delete().deletingChildrenIfNeeded().forPath(key);
       }
       client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
  • #persist(…) 存储持久节点数据。逻辑等价于 insertOrUpdate 操作。
  • persistEphemeral(…) 存储临时节点数据。节点类型无法变更,因此如果数据已存在,需要先进行删除。
  • #isExisted(...)#update(...) 代码如下: @Override public boolean isExisted(final String key) { try { return null != client.checkExists().forPath(key); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); return false; } } @Override public void update(final String key, final String value) { try { client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }
    • #update(…) 使用事务校验键( key )存在才进行更新。

2.7 存储顺序注册数据

实现逻辑和存储注册数据类似。Elastic-Job 未使用该方法,跳过。

2.8 移除注册数据

@Override
public void remove(final String key) {
   try {
       client.delete().deletingChildrenIfNeeded().forPath(key);
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}

2.9 获取注册中心当前时间

@Override
public long getRegistryCenterTime(final String key) {
   long result = 0L;
   try {
       persist(key, "");
       result = client.checkExists().forPath(key).getMtime();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   Preconditions.checkState(0L != result, "Cannot get registry center time.");
   return result;
}
  • 通过更新节点,获得该节点的最后更新时间( mtime )获得 Zookeeper 的时间。six six six。

2.10 注册中心异常处理器

RegExceptionHandler,注册中心异常处理器。在上面的操作 Zookeeper 发生异常时,都会调用 RegExceptionHandler.handleException(...) 处理异常:

public static void handleException(final Exception cause) {
   if (null == cause) {
       return;
   }
   if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
       log.debug("Elastic job: ignored exception for: {}", cause.getMessage());
   } else if (cause instanceof InterruptedException) {
       Thread.currentThread().interrupt();
   } else {
       throw new RegException(cause);
   }
}

private static boolean isIgnoredException(final Throwable cause) {
   return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
}
  • 部分异常会被无视,仅打印异常。例如调用 #getDirectly(…) 获得注册数据时,可能节点不存在,抛出 NodeExistsException,这种异常可以无视。

3. 作业节点数据访问类

JobNodeStorage,作业节点数据访问类。

3.1 在主节点执行操作

// JobNodeStorage.java
/**
* 在主节点执行操作.
* 
* @param latchNode 分布式锁使用的节点,例如:leader/election/latch
* @param callback 执行操作的回调
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
   try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
       latch.start();
       latch.await();
       callback.execute();
   } catch (final Exception ex) {
       handleException(ex);
   }
}

Apache Curator 使用 Zookeeper 实现了两种分布式锁,LeaderLatch 是其中的一种。使用一个 Zookeeper 节点路径创建一个 LeaderLatch,#start() 后,调用 #await() 等待拿到这把。如果有多个线程执行了相同节点路径的 LeaderLatch 的 #await() 后,同一时刻有且仅有一个线程可以继续执行,其他线程需要等待。当该线程释放( LeaderLatch#close() )后,下一个线程可以拿到该继续执行。用 Java 并发包 Lock 举例子:

public void executeInLeader(Lock lock) {
    try {
        lock.lock();
        // doSomething();
    } finally {
        lock.unlock();
    }
}

《官方文档 —— LeaderLatch》,有兴趣的同学可以看看。在《Elastic-Job-Lite 源码解析 —— 主节点选举》中,我们会看到 #executeInLeader(...) 的使用。

另一种分布式锁实现,《官方文档 —— LeaderElection》,有兴趣也可以看看。在 Elastic-Job-Cloud 中使用到了,后续进行解析。

3.2 在事务中执行操作

// JobNodeStorage.java
public void executeInTransaction(final TransactionExecutionCallback callback) {
   try {
       CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
       callback.execute(curatorTransactionFinal);
       curatorTransactionFinal.commit();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
}
  • 开启事务,执行 TransactionExecutionCallback 回调逻辑,提交事务。

666. 彩蛋

旁白君:煞笔芋道君,又在水更 芋道君:人艰不拆,好不好。

道友,赶紧上车,分享一波朋友圈!



本文分享自微信公众号 - 芋道源码(YunaiV),作者:老艿艿

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-09-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Fastson 等等,四种 Java 常用 JSON 库性能比较

    本篇通过JMH来测试一下Java中几种常见的JSON解析库的性能。每次都在网上看到别人说什么某某库性能是如何如何的好,碾压其他的库。但是百闻不如一见,只有自己亲...

    芋道源码
  • Java 线程的 wait 和 notify 的神坑

    也许我们只知道wait和notify是实现线程通信的,同时要使用synchronized包住,其实在开发中知道这个是远远不够的。接下来看看两个常见的问题。

    芋道源码
  • SpringBoot 整合 Shiro 实现动态权限加载更新+ Session 共享 + 单点登录

    来源:Sans_ juejin.im/post/5d087d605188256de9779e64

    芋道源码
  • SpringBoot集成Redis集群就这么Easy

    这次写一下springboot与redis的结合,这里使用的是redis集群模式(主从),主从环境的搭建,请参考redis集群搭建

    JAVA葵花宝典
  • Android数据库开源框架GreenDao分析

    前段时间写Demo的时候遇到了数据库的并发问题 Android数据库多线程并发操作异常 ,然后研究了一下 Android中的数据库连接池 。在看相关代码的时候阅...

    静默加载
  • 提取jedis源码的一致性hash代码作为通用工具类

    一致性Hash算法是来解决热点问题,如果虚拟节点设置过小热点问题仍旧存在。 关于一致性Hash算法的原理我就不说了,网上有很多人提供自己编写的一致性Hash算...

    intsmaze-刘洋
  • 商城项目-网关的登录拦截器

    接下来,我们在Zuul编写拦截器,对用户的token进行校验,如果发现未登录,则进行拦截。

    cwl_java
  • 快速学习-JPA中的一对多

    在一对多关系中,我们习惯把一的一方称之为主表,把多的一方称之为从表。在数据库中建立一对多的关系,需要使用数据库的外键约束。

    cwl_java
  • 详解RxJava2 Retrofit2 网络框架简洁轻便封装

    RxJava2、Retrofit2火了有一段时间了,前段时间给公司的项目引入了这方面相关的技术,在此记录一下相关封装的思路。

    砸漏
  • [享学Netflix] 十、Archaius对Commons Configuration核心API Configuration的扩展实现

    上篇文章体验了一把Netflix Archaius的使用,感受到了它对配置管理上的便捷性。或许有小伙伴会说,配置管理上它和Apache Commons Conf...

    YourBatman

扫码关注云+社区

领取腾讯云代金券