前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot scheduled并发配置

springboot scheduled并发配置

作者头像
用户1225216
发布2018-03-05 14:33:43
3.7K0
发布2018-03-05 14:33:43
举报
文章被收录于专栏:扎心了老铁扎心了老铁

本文介绍如何使用springboot的sheduled实现任务的定时调度,并将调度的任务实现为并发的方式。

1、定时调度配置scheduled

1)注册定时任务

代码语言:javascript
复制
package com.xiaoju.dqa.sentinel.scheduler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class Scheduler {
    private static final Logger logger = LoggerFactory.getLogger(Scheduler.class);

    @Scheduled(cron = "0 0/2 * * * ?")
    public void cronTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("cron任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }

    }

    @Scheduled(fixedRate = 2 * 60 )
    public void rateTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("fixedRate任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
    }

}

2)启动定时任务

代码语言:javascript
复制
package com.xiaoju.dqa.sentinel;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
@EnableAutoConfiguration
@ComponentScan
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

这里就介绍两种配置调度时间的方式:

1)cron表达式

2)fixedRate,调度频率也就是调度间隔

如下代码中设置的都是每两分钟调度一次。你只需要将任务用@Scheduled装饰即可。

我这里只写了两个调度任务,而且只sleep1s,如果你sleep 10s的话你就能清晰的看到,两个任务是串行执行的。

springboot中定时任务的执行时串行的

开始把他改成并行的。

2、定时调度并行化

定时调度的并行化,线程池实现,非常简单,只需要添加一个configuration,实现SchedulingConfigurer接口就可以了。

代码语言:javascript
复制
package com.xiaoju.dqa.sentinel.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Configuration
@EnableScheduling
public class ScheduleConfiguration implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskExecutor());
    }

    @Bean(destroyMethod="shutdown")
    public Executor taskExecutor() {
        return Executors.newScheduledThreadPool(100);
    }
}

然后你重启服务,可以看到两个任务并行的执行起来。

3、将任务里的方法设置为异步

代码语言:javascript
复制
package com.xiaoju.dqa.sentinel.scheduler;

import com.xiaoju.dqa.sentinel.service.AuditCollect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class Scheduler {
    private static final Logger logger = LoggerFactory.getLogger(Scheduler.class);

    @Autowired
    private AuditCollect auditCollect;

    @Scheduled(cron = "0 0/2 * * * ?")
    public void cronTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("cron任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            auditCollect.doAuditCollect();
        } catch (InterruptedException e) {
        }

    }

    @Scheduled(fixedRate = 2 * 60 )
    public void rateTask() {
        long timestamp = System.currentTimeMillis();
        try {
            Thread thread = Thread.currentThread();
            logger.info("fixedRate任务开始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName());
            auditCollect.doAuditCollect();
        } catch (InterruptedException e) {
        }
    }

}

比如这里有个函数执行的是数据收集,可以把他实现为异步的,并同样扔到线程池里并发的执行。

看看是怎么实现的。

代码语言:javascript
复制
package com.xiaoju.dqa.sentinel.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xiaoju.dqa.sentinel.client.es.ESDao;
import com.xiaoju.dqa.sentinel.client.es.entity.ESDocument;
import com.xiaoju.dqa.sentinel.client.es.entity.SearchDocument;
import com.xiaoju.dqa.sentinel.client.redis.RedisClient;
import com.xiaoju.dqa.sentinel.mapper.sentinel.SentinelMapper;
import com.xiaoju.dqa.sentinel.model.SentinelClan;
import com.xiaoju.dqa.sentinel.utils.ESSQLUtil;
import com.xiaoju.dqa.sentinel.utils.GlobalStaticConf;
import com.xiaoju.dqa.sentinel.utils.TimeFunction;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.*;


@Component
public class AuditCollect {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Async("sentinelSimpleAsync")
    public void doAuditCollect(JSONObject clanObject, long currentTime) {
        JSONArray topicArray = clanObject.getJSONArray("families");
        // 遍历所有的topic
        for (int j = 0; j < topicArray.size(); j++) {
            JSONObject topicObject = topicArray.getJSONObject(j);
            audit(clanObject, topicObject, currentTime);
        }
    }
}

可以看到只是用@Async注释一下,并且加入了异步的executor=sentinelSimpleAsync。

SentinelSimpleAsync是我们自己实现来定制化线程池的。

代码语言:javascript
复制
package com.xiaoju.dqa.sentinel.configuration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ExecutorConfiguration {

    @Value("${executor.pool.core.size}")
    private int corePoolSize;
    @Value("${executor.pool.max.size}")
    private int maxPoolSize;
    @Value("${executor.queue.capacity}")
    private int queueCapacity;

    @Bean
    public Executor sentinelSimpleAsync() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("SentinelSimpleExecutor-");
        executor.initialize();
        return executor;
    }

    @Bean
    public Executor sentinelAsync() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("SentinelSwapExecutor-");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

配置文件如下:

代码语言:javascript
复制
#============== 线程池 ===================
executor.pool.core.size=100
executor.pool.max.size=150
executor.queue.capacity=2000

想让异步生效的话,只需要在application类上加上EnableAsync注释就好了。

代码语言:javascript
复制
package com.xiaoju.dqa.sentinel;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
@EnableAutoConfiguration
@EnableAsync
@ComponentScan
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-08-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档