摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文基于 Elastic-Job V2.1.5 版本分享
本文主要分享 Elastic-Job-Lite 注册中心。
涉及到主要类的类图如下( 打开大图 ):
elastic-job-common-core
项目里,为 Elastic-Job-Lite、Elastic-Job-Cloud 公用注册中心类。你行好事会因为得到赞赏而愉悦 同理,开源项目贡献者会因为 Star 而更加有动力 为 Elastic-Job 点赞!传送门
ZookeeperRegistryCenter,基于 Zookeeper 注册中心。从上面的类图可以看到,ZookeeperRegistryCenter 实现 CoordinatorRegistryCenter 接口,CoordinatorRegistryCenter 继承 RegistryCenter 接口。
ZookeeperRegistryCenter 使用 Apache Curator 进行 Zookeeper 注册中心。
ZookeeperConfiguration,基于 Zookeeper 的注册中心配置,注释完整,点击链接直接查看。
@Override
public void init() {
log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getServerLists())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
.namespace(zkConfig.getNamespace()); // 命名空间
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); // 会话超时时间,默认 60 * 1000 毫秒
}
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); // 连接超时时间,默认 15 * 1000 毫秒
}
// 认证
if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
client = builder.build();
client.start();
// 连接 Zookeeper
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
}
baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
。如果对其它重连策略感兴趣,可以看 RetryPolicy 的实现类,本文就不展开了。ZookeeperConfiguration.namespace
)。通过 Curator TreeCache 实现监控整个树( Zookeeper目录 )的数据订阅和缓存,包括节点的状态,子节点的状态。
初始化作业缓存
作业初始化注册时,初始化缓存。
// JobRegistry.java
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
schedulerMap.put(jobName, jobScheduleController);
regCenterMap.put(jobName, regCenter);
// 添加注册中心缓存
regCenter.addCacheData("/" + jobName);
}
// ZookeeperRegistryCenter.java
/**
* 缓存
* key:/作业名/
*/
private final Map<String, TreeCache> caches = new HashMap<>();
作业服务订阅数据
每个不同的服务,都会订阅数据实现功能逻辑。在后续不同服务的文章,我们会详细解析。?
public void addDataListener(final TreeCacheListener listener) {
TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
cache.getListenable().addListener(listener);
}
关闭作业缓存
@Override
public void evictCacheData(final String cachePath) {
TreeCache cache = caches.remove(cachePath + "/");
if (null != cache) {
cache.close();
}
}
对 Curator TreeCache 感兴趣的同学,可以点击链接继续了解。
public void close() {
for (Entry<String, TreeCache> each : caches.entrySet()) {
each.getValue().close();
}
waitForCacheClose();
CloseableUtils.closeQuietly(client);
}
/*
* 因为异步处理, 可能会导致client先关闭而cache还未关闭结束.
* 等待Curator新版本解决这个bug.
* BUG地址:https://issues.apache.org/jira/browse/CURATOR-157
*/
private void waitForCacheClose() {
try {
Thread.sleep(500L); // 等待500ms, cache先关闭再关闭client, 否则会抛异常
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public String get(final String key) {
TreeCache cache = findTreeCache(key); // 获取缓存
if (null == cache) {
return getDirectly(key);
}
ChildData resultInCache = cache.getCurrentData(key); // 缓存中获取 value
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
}
return getDirectly(key);
}
@Override
public String getDirectly(final String key) {
try {
return new String(client.getData().forPath(key), Charsets.UTF_8);
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
return null;
}
}
#get(…)
先从 TreeCache缓存 获取,后从 Zookeeper 获取。#getDirectly(…)
直接从 Zookeeper 获取。#findTreeCache(...)
代码如下:
private TreeCache findTreeCache(final String key) { for (Entry<String, TreeCache> entry : caches.entrySet()) { if (key.startsWith(entry.getKey())) { return entry.getValue(); } } return null; }获取子节点名称集合(降序)
@Override
public List<String> getChildrenKeys(final String key) {
try {
List<String> result = client.getChildren().forPath(key);
Collections.sort(result, new Comparator<String>() {
@Override
public int compare(final String o1, final String o2) {
return o2.compareTo(o1);
}
});
return result;
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
return Collections.emptyList();
}
}
获取子节点数量
@Override
public int getNumChildren(final String key) {
try {
Stat stat = client.checkExists().forPath(key);
if (null != stat) {
return stat.getNumChildren();
}
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
return 0;
}
@Override
public void persist(final String key, final String value) {
try {
if (!isExisted(key)) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
} else {
update(key, value);
}
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
}
@Override
public void persistEphemeral(final String key, final String value) {
try {
if (isExisted(key)) {
client.delete().deletingChildrenIfNeeded().forPath(key);
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
}
#persist(…)
存储持久节点数据。逻辑等价于 insertOrUpdate 操作。persistEphemeral(…)
存储临时节点数据。节点类型无法变更,因此如果数据已存在,需要先进行删除。#isExisted(...)
、#update(...)
代码如下:
@Override public boolean isExisted(final String key) { try { return null != client.checkExists().forPath(key); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); return false; } } @Override public void update(final String key, final String value) { try { client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); } catch (final Exception ex) { RegExceptionHandler.handleException(ex); } }#update(…)
使用事务校验键( key )存在才进行更新。实现逻辑和存储注册数据类似。Elastic-Job 未使用该方法,跳过。
@Override
public void remove(final String key) {
try {
client.delete().deletingChildrenIfNeeded().forPath(key);
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
}
@Override
public long getRegistryCenterTime(final String key) {
long result = 0L;
try {
persist(key, "");
result = client.checkExists().forPath(key).getMtime();
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
Preconditions.checkState(0L != result, "Cannot get registry center time.");
return result;
}
mtime
)获得 Zookeeper 的时间。six six six。RegExceptionHandler,注册中心异常处理器。在上面的操作 Zookeeper 发生异常时,都会调用 RegExceptionHandler.handleException(...)
处理异常:
public static void handleException(final Exception cause) {
if (null == cause) {
return;
}
if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
log.debug("Elastic job: ignored exception for: {}", cause.getMessage());
} else if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
throw new RegException(cause);
}
}
private static boolean isIgnoredException(final Throwable cause) {
return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
}
#getDirectly(…)
获得注册数据时,可能节点不存在,抛出 NodeExistsException,这种异常可以无视。JobNodeStorage,作业节点数据访问类。
// JobNodeStorage.java
/**
* 在主节点执行操作.
*
* @param latchNode 分布式锁使用的节点,例如:leader/election/latch
* @param callback 执行操作的回调
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
Apache Curator 使用 Zookeeper 实现了两种分布式锁,LeaderLatch 是其中的一种。使用一个 Zookeeper 节点路径创建一个 LeaderLatch,#start()
后,调用 #await()
等待拿到这把锁。如果有多个线程执行了相同节点路径的 LeaderLatch 的 #await()
后,同一时刻有且仅有一个线程可以继续执行,其他线程需要等待。当该线程释放( LeaderLatch#close()
)后,下一个线程可以拿到该锁继续执行。用 Java 并发包 Lock 举例子:
public void executeInLeader(Lock lock) {
try {
lock.lock();
// doSomething();
} finally {
lock.unlock();
}
}
《官方文档 —— LeaderLatch》,有兴趣的同学可以看看。在《Elastic-Job-Lite 源码解析 —— 主节点选举》中,我们会看到 #executeInLeader(...)
的使用。
另一种分布式锁实现,《官方文档 —— LeaderElection》,有兴趣也可以看看。在 Elastic-Job-Cloud 中使用到了,后续进行解析。
// JobNodeStorage.java
public void executeInTransaction(final TransactionExecutionCallback callback) {
try {
CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
callback.execute(curatorTransactionFinal);
curatorTransactionFinal.commit();
} catch (final Exception ex) {
RegExceptionHandler.handleException(ex);
}
}
旁白君:煞笔芋道君,又在水更 芋道君:人艰不拆,好不好。
道友,赶紧上车,分享一波朋友圈!