elastic-job-lite在项目中使用也有两个多月的时间了,从一开始搜索网上教程,参考别人使用方法,到后面阅读源码,理解其架构,实现。也写了几篇关于ejl的架构,流程处理,数据结构的文章,中间也经历了很多坑,也有了一些最佳实践,这篇文章写一下总结,但不是ejl的结尾,后续还会有ejl的文章,下面附上前几篇的链接:
ejl中有三种job作业类型,simple, dataflow, script,这三种任务类型都支持cron表达式定时调用,也支持页面单次触发。
void execute(ShardingContext var1);
List<T> fetchData(ShardingContext var1);
void processData(ShardingContext var1, List<T> var2);
public final class ScriptJobExecutor extends AbstractElasticJobExecutor{
protected void process(final ShardingContext shardingContext) {
//scriptCommandLine从上下文中获取到,声明为final
final String scriptCommandLine = shardingContext.get...
this.executeScript(shardingContext, scriptCommandLine);
}
private void executeScript(final ShardingContext shardingContext, final String scriptCommandLine){
CommandLine commandLine = CommandLine.parse(scriptCommandLine);
commandLine.addArgument("上下文json串" false);
try {
//org.apache.commons.exec apache的包
new DefaultExecutor().execute(commandLine);
} catch (final IOException ex) {
throw new JobConfigurationException("Execute script failure.", ex);
}}
}
上面介绍了三种作业类型的使用方法和场景,不管是以上那种作业类型,ejl都支持为其添加监听器,能够实现在作业开始执行前和执行完成后做一些准备和清理工作,并且是在分布式环境下进行,也就是保证所有节点还未开始执行时,就执行监听器中的befor方法,在所有节点都执行完成后,在执行after方法,这是非常有帮助的,通常我们都会用到,比如我们在任务开始前需要准备一些基础数据,在结束需要告知下游系统完成,其接口如下:
public interface ElasticJobListener {
void beforeJobExecuted(ShardingContexts var1);
void afterJobExecuted(ShardingContexts var1);
}
ejl是不支持作业连贯的,比如,我们的job依赖上游系统作业的完成,怎么做呢?
这里记录一下使用过程中踩过的坑
public class JobConfig1 {
@Resource
private ZookeeperRegistryCenter regCenter;
@Bean(name = "job1")
public DataflowJob jobConifg() {
return new Job1()();
}
}
public class JobConfig2 {
@Resource
private ZookeeperRegistryCenter regCenter;
@Bean(name = "job2")
public DataflowJob jobConifg() {
return new Job1();
}
}
if (guaranteeService.isAllStarted()) {
doBeforeJobExecutedAtLastStarted(shardingContexts);
guaranteeService.clearAllStartedInfo();
return;
}
long before = timeService.getCurrentMillis();
try {
synchronized (startedWait) {
startedWait.wait(startedTimeoutMilliseconds);
}
} catch (final InterruptedException ex) {
Thread.interrupted();
}
public boolean isAllStarted() {
return this.jobNodeStorage.isJobNodeExisted("guarantee/started")
&&
this.configService.load(false).getTypeConfig()
.getCoreConfig().getShardingTotalCount() ==
this.jobNodeStorage.getJobNodeChildrenKeys("guarantee/started").size();
}
第一种解决方案代码:
leaderLatch = new LeaderLatch(client, path, id);
LeaderLatchListener leaderLatchListener = new LeaderLatchListener() {
@Override
public void isLeader() {
doBefore....
clearAllStartedInfo
//释放锁
}
@Override
public void notLeader() {
//休眠,等待唤醒
}
};
leaderLatch.addListener(leaderLatchListener);
leaderLatch.start();
第二种解决方案代码:
while(shardingContexts.getShardingItemParameters().containsKey(0)) {
if (guaranteeService.isAllStarted()) {
doBeforeJobExecutedAtLastStarted(shardingContexts);
guaranteeService.clearAllStartedInfo();
return;
}
Thread.sleep(10)
}
//否则直接休眠
try {
synchronized (startedWait) {
startedWait.wait(startedTimeoutMilliseconds);
}
} catch (final InterruptedException ex) {
Thread.interrupted();
}