摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文基于 Elastic-Job V2.1.5 版本分享
本文主要分享 Elastic-Job-Lite 注册中心。
涉及到主要类的类图如下( 打开大图 ):
elastic-job-common-core
项目里,为 Elastic-Job-Lite、Elastic-Job-Cloud 公用注册中心类。你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 Elastic-Job 点赞!传送门
ZookeeperRegistryCenter,基于 Zookeeper 注册中心。从上面的类图可以看到,ZookeeperRegistryCenter 实现 CoordinatorRegistryCenter 接口,CoordinatorRegistryCenter 继承 RegistryCenter 接口。
ZookeeperRegistryCenter 使用 Apache Curator 进行 Zookeeper 注册中心。
ZookeeperConfiguration,基于 Zookeeper 的注册中心配置,注释完整,点击链接直接查看。
@Override public void init() { log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(zkConfig.getServerLists()) .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())) .namespace(zkConfig.getNamespace()); // 命名空间 if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); // 会话超时时间,默认 60 * 1000 毫秒 } if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); // 连接超时时间,默认 15 * 1000 毫秒 } // 认证 if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8)) .aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); // 连接 Zookeeper try { if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) { client.close(); throw new KeeperException.OperationTimeoutException(); } } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }
baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
。如果对其它重连策略感兴趣,可以看 RetryPolicy 的实现类,本文就不展开了。ZookeeperConfiguration.namespace
)。通过 Curator TreeCache 实现监控整个树( Zookeeper目录 )的数据订阅和缓存,包括节点的状态,子节点的状态。
初始化作业缓存
作业初始化注册时,初始化缓存。
// JobRegistry.java public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) { schedulerMap.put(jobName, jobScheduleController); regCenterMap.put(jobName, regCenter); // 添加注册中心缓存 regCenter.addCacheData("/" + jobName); } // ZookeeperRegistryCenter.java /** * 缓存 * key:/作业名/ */ private final Map<String, TreeCache> caches = new HashMap<>();
作业服务订阅数据
每个不同的服务,都会订阅数据实现功能逻辑。在后续不同服务的文章,我们会详细解析。?
public void addDataListener(final TreeCacheListener listener) { TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName); cache.getListenable().addListener(listener); }
关闭作业缓存
@Override public void evictCacheData(final String cachePath) { TreeCache cache = caches.remove(cachePath + "/"); if (null != cache) { cache.close(); } }
对 Curator TreeCache 感兴趣的同学,可以点击链接继续了解。
public void close() { for (Entry<String, TreeCache> each : caches.entrySet()) { each.getValue().close(); } waitForCacheClose(); CloseableUtils.closeQuietly(client); } /* * 因为异步处理, 可能会导致client先关闭而cache还未关闭结束. * 等待Curator新版本解决这个bug. * BUG地址:https://issues.apache.org/jira/browse/CURATOR-157 */ private void waitForCacheClose() { try { Thread.sleep(500L); // 等待500ms, cache先关闭再关闭client, 否则会抛异常 } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } }
@Override public String get(final String key) { TreeCache cache = findTreeCache(key); // 获取缓存 if (null == cache) { return getDirectly(key); } ChildData resultInCache = cache.getCurrentData(key); // 缓存中获取 value if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8); } return getDirectly(key); } @Override public String getDirectly(final String key) { try { return new String(client.getData().forPath(key), Charsets.UTF_8); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); return null; } }
#get(…)
先从 TreeCache缓存 获取,后从 Zookeeper 获取。#getDirectly(…)
直接从 Zookeeper 获取。#findTreeCache(...)
代码如下:
private TreeCache findTreeCache(final String key) { for (Entry<String, TreeCache> entry : caches.entrySet()) { if (key.startsWith(entry.getKey())) { return entry.getValue(); } } return null; }获取子节点名称集合(降序)
@Override public List<String> getChildrenKeys(final String key) { try { List<String> result = client.getChildren().forPath(key); Collections.sort(result, new Comparator<String>() { @Override public int compare(final String o1, final String o2) { return o2.compareTo(o1); } }); return result; } catch (final Exception ex) { RegExceptionHandler.handleException(ex); return Collections.emptyList(); } }
获取子节点数量
@Override public int getNumChildren(final String key) { try { Stat stat = client.checkExists().forPath(key); if (null != stat) { return stat.getNumChildren(); } } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } return 0; }
@Override public void persist(final String key, final String value) { try { if (!isExisted(key)) { client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8)); } else { update(key, value); } } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } } @Override public void persistEphemeral(final String key, final String value) { try { if (isExisted(key)) { client.delete().deletingChildrenIfNeeded().forPath(key); } client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8)); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }
#persist(…)
存储持久节点数据。逻辑等价于 insertOrUpdate 操作。persistEphemeral(…)
存储临时节点数据。节点类型无法变更,因此如果数据已存在,需要先进行删除。#isExisted(...)
、#update(...)
代码如下:
@Override public boolean isExisted(final String key) { try { return null != client.checkExists().forPath(key); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); return false; } } @Override public void update(final String key, final String value) { try { client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }#update(…)
使用事务校验键( key )存在才进行更新。实现逻辑和存储注册数据类似。Elastic-Job 未使用该方法,跳过。
@Override public void remove(final String key) { try { client.delete().deletingChildrenIfNeeded().forPath(key); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }
@Override public long getRegistryCenterTime(final String key) { long result = 0L; try { persist(key, ""); result = client.checkExists().forPath(key).getMtime(); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } Preconditions.checkState(0L != result, "Cannot get registry center time."); return result; }
mtime
)获得 Zookeeper 的时间。six six six。RegExceptionHandler,注册中心异常处理器。在上面的操作 Zookeeper 发生异常时,都会调用 RegExceptionHandler.handleException(...)
处理异常:
public static void handleException(final Exception cause) { if (null == cause) { return; } if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) { log.debug("Elastic job: ignored exception for: {}", cause.getMessage()); } else if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); } else { throw new RegException(cause); } } private static boolean isIgnoredException(final Throwable cause) { return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException; }
#getDirectly(…)
获得注册数据时,可能节点不存在,抛出 NodeExistsException,这种异常可以无视。JobNodeStorage,作业节点数据访问类。
// JobNodeStorage.java /** * 在主节点执行操作. * * @param latchNode 分布式锁使用的节点,例如:leader/election/latch * @param callback 执行操作的回调 */ public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); callback.execute(); } catch (final Exception ex) { handleException(ex); } }
Apache Curator 使用 Zookeeper 实现了两种分布式锁,LeaderLatch 是其中的一种。使用一个 Zookeeper 节点路径创建一个 LeaderLatch,#start()
后,调用 #await()
等待拿到这把锁。如果有多个线程执行了相同节点路径的 LeaderLatch 的 #await()
后,同一时刻有且仅有一个线程可以继续执行,其他线程需要等待。当该线程释放( LeaderLatch#close()
)后,下一个线程可以拿到该锁继续执行。用 Java 并发包 Lock 举例子:
public void executeInLeader(Lock lock) { try { lock.lock(); // doSomething(); } finally { lock.unlock(); } }
《官方文档 —— LeaderLatch》,有兴趣的同学可以看看。在《Elastic-Job-Lite 源码解析 —— 主节点选举》中,我们会看到 #executeInLeader(...)
的使用。
另一种分布式锁实现,《官方文档 —— LeaderElection》,有兴趣也可以看看。在 Elastic-Job-Cloud 中使用到了,后续进行解析。
// JobNodeStorage.java public void executeInTransaction(final TransactionExecutionCallback callback) { try { CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and(); callback.execute(curatorTransactionFinal); curatorTransactionFinal.commit(); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }
旁白君:煞笔芋道君,又在水更 芋道君:人艰不拆,好不好。
道友,赶紧上车,分享一波朋友圈!
本文分享自微信公众号 - 芋道源码(YunaiV)
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2018-09-17
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句