首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >在Spring中使用cron调度周期性的反应性任务?

在Spring中使用cron调度周期性的反应性任务?
EN

Stack Overflow用户
提问于 2018-12-18 23:01:03
回答 3查看 4.8K关注 0票数 6

通常,我会这样安排一个作业在Spring中定期执行,并在给定的时区使用cron:

@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
    // Do stuff
}

这将阻塞调用scheduleStuff的线程,直到作业完成。然而,在这种情况下,我想要做的“东西”都是使用project reactor的非阻塞构建块(即MonoFlux等)实现的。

例如,假设我想周期性地触发这个函数:

Flux<Void> stuff() {
    return ..
}

当然,我可以简单地调用stuff().subscribe() (甚至stuff().block()),但这会阻塞线程。对于非阻塞代码,有没有更好的方法来实现与@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")相同的功能?

我使用的是Spring Boot 2.1。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-12-19 11:09:24

实际上,subscribe()不会阻塞你的线程。如果确实需要,可以调用stuff().subscribeOn(Schedulers.parallel()).subscribe()或其他调度器来确保执行将在单独的线程中完成。

票数 5
EN

Stack Overflow用户

发布于 2018-12-19 01:02:03

您可以将stuff方法包装在异步方法中

例如:

@Scheduled(cron = "0 0 10 * * *", zone = "Europe/Stockholm")
public void scheduleStuff() {
    stuffService.doStuffAsync();
}

使用异步方法的服务

public class StuffService() implements IStuffService {

    @Async
    public void doStuffAsync() {
       // Call and subscribe to your flux method here
    }

}

doStuffAsync()的调用将立即返回相应的scheduleStuff,而不会阻塞线程。

票数 0
EN

Stack Overflow用户

发布于 2022-01-28 23:28:53

这里还有一个选项:

public class PeriodicReactiveTasksInSpring implements SmartLifecycle {

    private final AtomicReference<Subscription> subscription;
    private final Long executionPeriod;

    public PeriodicReactiveTasksInSpring(Long executionPeriod) {
        this.subscription = new AtomicReference<>();
        this.executionPeriod = executionPeriod;
    }

    @Override
    public void start() {
        if (Objects.isNull(subscription.get())) {
            updateConfig()
                    .doOnSubscribe(sub -> {
                        subscription.set(sub);
                    }).subscribe();
        }
    }

    @Override
    public void stop() {
        Optional.ofNullable(subscription.get())
                .ifPresent(sub -> {
                    sub.cancel();
                    subscription.set(null);
                });
    }

    @Override
    public boolean isRunning() {
        return Objects.nonNull(subscription.get());
    }


    private Flux<Item> updateConfig() {
        return Flux.interval(Duration.ofMillis(executionPeriod))
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(cfg -> {
                    // Do your job here
                })
                .onErrorContinue((err, msg) -> LOGGER.error("Error: {} message: {}", err, msg));
    }

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53835800

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档