springboot scheduled并发配置

本文介绍如何使用springboot的sheduled实现任务的定时调度,并将调度的任务实现为并发的方式。

1、定时调度配置scheduled

1)注册定时任务

package com.xiaoju.dqa.sentinel.scheduler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class Scheduler {
    private static final Logger logger = LoggerFactory.getLogger(Scheduler.class);

    @Scheduled(cron = "0 0/2 * * * ?")
    public void cronTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("cron任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }

    }

    @Scheduled(fixedRate = 2 * 60 )
    public void rateTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("fixedRate任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
    }

}

2)启动定时任务

package com.xiaoju.dqa.sentinel;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
@EnableAutoConfiguration
@ComponentScan
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

这里就介绍两种配置调度时间的方式:

1)cron表达式

2)fixedRate,调度频率也就是调度间隔

如下代码中设置的都是每两分钟调度一次。你只需要将任务用@Scheduled装饰即可。

我这里只写了两个调度任务,而且只sleep1s,如果你sleep 10s的话你就能清晰的看到,两个任务是串行执行的。

springboot中定时任务的执行时串行的

开始把他改成并行的。

2、定时调度并行化

定时调度的并行化,线程池实现,非常简单,只需要添加一个configuration,实现SchedulingConfigurer接口就可以了。

package com.xiaoju.dqa.sentinel.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Configuration
@EnableScheduling
public class ScheduleConfiguration implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskExecutor());
    }

    @Bean(destroyMethod="shutdown")
    public Executor taskExecutor() {
        return Executors.newScheduledThreadPool(100);
    }
}

然后你重启服务,可以看到两个任务并行的执行起来。

3、将任务里的方法设置为异步

package com.xiaoju.dqa.sentinel.scheduler;

import com.xiaoju.dqa.sentinel.service.AuditCollect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class Scheduler {
    private static final Logger logger = LoggerFactory.getLogger(Scheduler.class);

    @Autowired
    private AuditCollect auditCollect;

    @Scheduled(cron = "0 0/2 * * * ?")
    public void cronTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("cron任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            auditCollect.doAuditCollect();
        } catch (InterruptedException e) {
        }

    }

    @Scheduled(fixedRate = 2 * 60 )
    public void rateTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("fixedRate任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            auditCollect.doAuditCollect();
        } catch (InterruptedException e) {
        }
    }

}

比如这里有个函数执行的是数据收集,可以把他实现为异步的,并同样扔到线程池里并发的执行。

看看是怎么实现的。

package com.xiaoju.dqa.sentinel.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xiaoju.dqa.sentinel.client.es.ESDao;
import com.xiaoju.dqa.sentinel.client.es.entity.ESDocument;
import com.xiaoju.dqa.sentinel.client.es.entity.SearchDocument;
import com.xiaoju.dqa.sentinel.client.redis.RedisClient;
import com.xiaoju.dqa.sentinel.mapper.sentinel.SentinelMapper;
import com.xiaoju.dqa.sentinel.model.SentinelClan;
import com.xiaoju.dqa.sentinel.utils.ESSQLUtil;
import com.xiaoju.dqa.sentinel.utils.GlobalStaticConf;
import com.xiaoju.dqa.sentinel.utils.TimeFunction;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.*;


@Component
public class AuditCollect {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Async("sentinelSimpleAsync")
    public void doAuditCollect(JSONObject clanObject, long currentTime) {
        JSONArray topicArray = clanObject.getJSONArray("families");
        // 遍历所有的topic
        for (int j = 0; j < topicArray.size(); j++) {
            JSONObject topicObject = topicArray.getJSONObject(j);
            audit(clanObject, topicObject, currentTime);
        }
    }
}

可以看到只是用@Async注释一下,并且加入了异步的executor=sentinelSimpleAsync。

SentinelSimpleAsync是我们自己实现来定制化线程池的。

package com.xiaoju.dqa.sentinel.configuration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ExecutorConfiguration {

    @Value("${executor.pool.core.size}")
    private int corePoolSize;
    @Value("${executor.pool.max.size}")
    private int maxPoolSize;
    @Value("${executor.queue.capacity}")
    private int queueCapacity;

    @Bean
    public Executor sentinelSimpleAsync() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("SentinelSimpleExecutor-");
        executor.initialize();
        return executor;
    }

    @Bean
    public Executor sentinelAsync() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("SentinelSwapExecutor-");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

配置文件如下:

#============== 线程池 ===================
executor.pool.core.size=100
executor.pool.max.size=150
executor.queue.capacity=2000

想让异步生效的话,只需要在application类上加上EnableAsync注释就好了。

package com.xiaoju.dqa.sentinel;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
@EnableAutoConfiguration
@EnableAsync
@ComponentScan
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏javathings

@SpringBootApplication 的作用是什么

@SpringBootApplication 标注的类为 Spring Boot 的主配置类,Spring Boot 会运行这个类的 main 方法来启动 Sp...

1.1K20
来自专栏菩提树下的杨过

JAVA CDI 学习(5) - 如何向RESTFul Service中注入EJB实例

RESTFul Service中如果要注入EJB实例,常规的@Inject将不起作用,在Jboss中,应用甚至都启动不起来(因为@Inject注入失败),解决方...

22970
来自专栏Hongten

悲观锁 HibernateTest.java

import org.hibernate.LockMode; import org.hibernate.Query; import org.hibernate....

11710
来自专栏F_Alex

(四)SpringBoot2.0基础篇- 多数据源,JdbcTemplate和JpaRepository

47740
来自专栏Hongten

apache的开源项目-模板引擎(Velocity)_学习了两天就上手啦_源码下载

首先,如果你对Velocity不是很了解,还是建议你去apache的官方网站上去走走....

15810
来自专栏Android源码框架分析

获取Android设备DeviceId与反Xposed Hook技术

APP开发中常需要获取设备的DeviceId,以应对刷单,目前常用的几个设备识别码主要有IMEI(国际移动设备身份码 International Mobile ...

46720
来自专栏lgp20151222

Spring的注解@Qualifier小结

很明显了,在autoware时,由于有两个类实现了EmployeeService接口,所以Spring不知道应该绑定哪个实现类,所以抛出了如上错误。

10810
来自专栏osc同步分享-java技术分享站

springmvc 拦截器、国际化、验证

springmvc 拦截器 继承了HandlerIntercepter的类可以作为拦截器类: package com.yawn.intercepter; im...

37070
来自专栏一个会写诗的程序员的博客

第16章 Spring Boot + Kotlin: 下一代 Java 服务端开发

2017-11-22 11:55:17.205 INFO 14721 --- [ main] org.hibernate.Version ...

18710
来自专栏Hongten

Hibernate 过滤器

通过调用Session对象的setFilter()和enableFilter()方法使用过滤器。

13720

扫码关注云+社区

领取腾讯云代金券