通常,我会这样安排一个作业在Spring中定期执行,并在给定的时区使用cron:
@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
// Do stuff
}
这将阻塞调用scheduleStuff
的线程,直到作业完成。然而,在这种情况下,我想要做的“东西”都是使用project reactor的非阻塞构建块(即Mono
、Flux
等)实现的。
例如,假设我想周期性地触发这个函数:
Flux<Void> stuff() {
return ..
}
当然,我可以简单地调用stuff().subscribe()
(甚至stuff().block()
),但这会阻塞线程。对于非阻塞代码,有没有更好的方法来实现与@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
相同的功能?
我使用的是Spring Boot 2.1。
发布于 2018-12-19 11:09:24
实际上,subscribe()
不会阻塞你的线程。如果确实需要,可以调用stuff().subscribeOn(Schedulers.parallel()).subscribe()
或其他调度器来确保执行将在单独的线程中完成。
发布于 2018-12-19 01:02:03
您可以将stuff
方法包装在异步方法中
例如:
@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
stuffService.doStuffAsync();
}
使用异步方法的服务
public class StuffService() implements IStuffService {
@Async
public void doStuffAsync() {
// Call and subscribe to your flux method here
}
}
对doStuffAsync()
的调用将立即返回相应的scheduleStuff
,而不会阻塞线程。
发布于 2022-01-28 23:28:53
这里还有一个选项:
public class PeriodicReactiveTasksInSpring implements SmartLifecycle {
private final AtomicReference<Subscription> subscription;
private final Long executionPeriod;
public PeriodicReactiveTasksInSpring(Long executionPeriod) {
this.subscription = new AtomicReference<>();
this.executionPeriod = executionPeriod;
}
@Override
public void start() {
if (Objects.isNull(subscription.get())) {
updateConfig()
.doOnSubscribe(sub -> {
subscription.set(sub);
}).subscribe();
}
}
@Override
public void stop() {
Optional.ofNullable(subscription.get())
.ifPresent(sub -> {
sub.cancel();
subscription.set(null);
});
}
@Override
public boolean isRunning() {
return Objects.nonNull(subscription.get());
}
private Flux<Item> updateConfig() {
return Flux.interval(Duration.ofMillis(executionPeriod))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(cfg -> {
// Do your job here
})
.onErrorContinue((err, msg) -> LOGGER.error("Error: {} message: {}", err, msg));
}
}
https://stackoverflow.com/questions/53835800
复制相似问题