上一篇文章介绍了elastic-job-lite的入门,架构。使用和一些流程,里面提到elastic-job-lite是一个去中心化,轻量级的任务调度框架,那为什么elastic-jib-lite在启动时要选取主节点呢?难道我看错了,哈哈,不可能的,后文 elastic-job-lite简称ejl。
ejl定位为轻量级,去中心化,其任务调度由各自的机器驱动,各台机器之间通过zk去协调,ejl为每个任务都创建一个JobScheduler,而在JobScheduler的初始化中回为每个job选举一个主节点,记住不是全局一个主节点,而是每个任务一个主节。如下图,每个节点上都运行两个任务job1,job2,那么在启动时每个节点就会创建两个JobScheduler对象,为每一个任务在集群中选举一个leader。这个leader是怎么选举出来的呢?什么时候开始选举?一、在整个集群启动时为每个任务选举leader; 二、当有些任务的leader下线时,会重新选举。
在JobScheduler中
public void init(){
schedulerFacder.registerStartUpInfo();
}
public void registerStartUpInfo(){
leaderService.electLeader();
}
/** * 选举主节点. */
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH,
new LeaderElectionExecutionCallBack());
log.debug("Leader election completed.");
}
public void executeInLeader(final String latchNode,
final LeaderExecutionCallback callback) {
//通过LeaderLatch进行选举
//这是curator(zk的客户端)中类
LeaderLatch latch =
new LeaderLatch(getClient()jobNodePath.getFullPath(latchNode));
//开始选举
latch.start();
//阻塞,直到选举成功
latch.await();
//在回调方法中写入主节点标记
callback.execute();
}
在callback.execute()中执行如下,再次判断没有主节点,将当前机器示例id写入
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(
LeaderNode.INSTANCE,instanceId));
}
LeaderElectionJobListener
这个监听器来实现leader重新选举,当一个job还在运行,但leader节点下线了,就要重新选举leaderclass LeaderElectionJobListener extends AbstractJobListener{
protected void dataChanged(){
//具体选举看上面的代码
leaderService.electLeader()
}
}
主节点的选举的本质就是大伙竞争一个zk的分布式锁。谁先得到锁,谁就是主节点。
分布式系统中,在一个任务执行过程中,有多个机器,多个分片,那么如何去分配呢?哪些机器执行哪些分片呢,如果大家都参与岂不是乱了,这个时候就需要一个领导者来拍板。在ejl中有两处需要leader节点来参与:
AbstractElasticJobExecute
类中 execute
方法开始看起。AbstractElasticJobExecute类中
public final void execute(){
//获取分片,这个方法中主节点leader会分片
ShardingContexts shardingContexts =
jobFacade.getShardingContexts();
}
在 getShardingContexts()中,有如下方法
shardingService.shardingIfNecessary();
/**
如果需要分片且当前节点为主节点, 则作业分片.
如果当前无可用节点则不分片.
*/
public void shardingIfNecessary() {
//不是主节点直接返回,不允许分片
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
....
}
leader节点删除的时机有三处,一,在leader节点所在机器进程CRASHED时,jvm通过钩子方法删除自己;二,作业被禁用时删除leader节点,三,主节点进程远程关闭
JobShutdownHookPlugin类中
public void shutdown() {
if (leaderService.isLeader()) {
leaderService.removeLeader();
}
}
LeaderAbdicationJobListener类中
protected void dataChanged(
//判断是leader,并且作业被禁用
if (leaderService.isLeader()
&& isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
InstanceShutdownStatusJobListener类中
protected void dataChanged(
//当job未暂停,
//并且调度控制器未暂停,
//并且事件是移除这个实例,
//并且运行实例未被移除
if (!JobRegistry.getInstance().isShutdown(jobName)
&&
!JobRegistry.getInstance()
.getJobScheduleController(jobName).isPaused()
&&
isRemoveInstance(path, eventType)
&&
!isReconnectedRegistryCenter()) {
//在这个方法中removeLeader
schedulerFacade.shutdownInstance();
}
}
>
代码在
LeaderNode
类中
String ROOT = "leader";
String ELECTION_ROOT = ROOT + "/election";
String INSTANCE = ELECTION_ROOT + "/instance";
String LATCH = ELECTION_ROOT + "/latch";