前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >elastic-job选主过程

elastic-job选主过程

作者头像
leobhao
发布2022-06-28 18:34:31
4050
发布2022-06-28 18:34:31
举报
文章被收录于专栏:涓流涓流

elastic job 选主过程分析

elastic-job 选主

elastic主从服务器共同构成任务调度的分片节点。

ElasticJob的主服务器的职责是根据当前存活的任务调度服务器生成分片信息,然后拉取属于该分片的任务数据执行任务。为了避免分片信息的不统一,ElasticJob必须从所有的调度服务器中选择一台为主服务器,由该台服务器统一计算分片信息,其他服务根据该分片信息进行任务调度。

作业服务器注册启动的入口是: SchedulerFacade#registerStartUpInfo:

代码语言:javascript
复制
public void registerStartUpInfo(final boolean enabled) {
    // 启动所有ZK事件监听器
    listenerManager.startAllListeners();
    // 选主
    leaderService.electLeader();
    // 注册并持久化服务器信息(server信息)
    serverService.persistOnline(enabled);
    // 注册并持久化作业运行实例(instance信息)
    instanceService.persistOnline();
    // 设置是否需要重新分片
    shardingService.setReshardingFlag();
    // 启动调节分布式作业状态不一致服务
    monitorService.listen();
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}
elastic-job 监听器
  • ElectionListenerManager:主节点选举监听管理器
  • ShardingListenerManager:分片监听管理器。
  • FailoverListenerManager:失效转移监听管理器。
  • MonitorExecutionListenerManager:幂等性监听管理器。
  • ShutdownListenerManager:运行实例关闭监听管理器。
  • TriggerListenerManager:作业触发监听管理器。
  • RescheduleListenerManager:重调度监听管理器。
  • GuaranteeListenerManager:保证分布式任务全部开始和结束状态监听管理器。
监听器工作机制

ElectionListenerManager为例,ElectionListenerManager#start:

代码语言:javascript
复制
public void start() {
    addDataListener(new LeaderElectionJobListener());
    addDataListener(new LeaderAbdicationJobListener());
}

JobNodeStorage#addDataListener(zk添加监听器):

代码语言:javascript
复制
public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
    cache.getListenable().addListener(listener);
}

首先获取TreeCache,然后使用cahce.getListenable().addListener(TreeCacheListener) 加入zk监听器中

ElectionListenerManager选主

ElectionListenerManager监听器是用来选举主节点的,执行的方法是LeaderService.electLeader:

代码语言:javascript
复制
public void electLeader() {
    log.debug("Elect a new leader now.");
    jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
    log.debug("Leader election completed.");
}

LeaderNode.LATCH是选主所用分布式锁节点目录, 具体路径是:Namespace/ {JobName}/leader/election/latch

代码语言:javascript
复制
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
    try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
        // 步骤1
        latch.start();
        // 步骤2
        latch.await();
        // 步骤3
        callback.execute();
    //CHECKSTYLE:OFF
    } catch (final Exception ex) {
    //CHECKSTYLE:ON
        handleException(ex);
    }
}

这里选主直接使用cautor开源框架提供的实现类org.apache.curator.framework.recipes.leader.LeaderLatch, LeaderLatch需要传入两个参数:

  1. CuratorFramework client:curator框架客户端
  2. latchPath:锁节点路径, 这里的路径为namespace/ {Jobname}/leader/election/latch

上述步骤1,2启动 LeaderLatch,这是zk客户端curator的方法,如果LeaderLatch是主节点,就返回,否则阻塞在这里等待下一次选举。

如果获得了分布式锁后,执行callback回调方法:LeaderService$LeaderElectionExecutionCallback:

代码语言:javascript
复制
@RequiredArgsConstructor
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
    
    @Override
    public void execute() {
        if (!hasLeader()) {
            jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        }
    }
}

这里判断,如果namespace/{jobname}/leader/election/instance节点不存在则创建该临时节点,节点里面的数据是IP地址@-@进程ID。代码是:jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]

整个选主的流程是:

参考资料

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-04-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • elastic job 选主过程分析
    • elastic-job 选主
      • elastic-job 监听器
      • 监听器工作机制
      • ElectionListenerManager选主
    • 参考资料
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档