[OFFICE-API](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.html)
- 并发操作
- 异步操作
配置
maven引入spring-context支持<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>线程池对象配置<bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="destroy">
<!--核心线程数(线程池维护线程的最少数量) -->
<property name="corePoolSize" value="4"/>
<!-- 最大线程数(线程池维护线程的最大数量) -->
<property name="maxPoolSize" value="4"/>
<!-- 线程最大空闲时间 -->
<property name="keepAliveSeconds" value="300"/>
<!-- 队列大小 >= mainExecutor.maxSize (缓存队列) -->
<property name="queueCapacity" value="200"/>
<!-- 线程名称前缀 -->
<property name="threadNamePrefix" value="acs_ThreadPool-"/>
<!-- 配置拒绝策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy"/>
</property>
</bean>
配置拒绝策略
- AbortPolicy
用于被拒绝任务的处理程序,它将抛出RejectedExecutionException。- CallerRunsPolicy
用于被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务。- DiscardOldestPolicy
用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute。- DiscardPolicy
用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
线程池对象注入
@Resource
private ThreadPoolTaskExecutor threadPool;
利用线程池管理线程(线程资源空闲时自动执行run方法)
threadPool.execute(new Thread("threadName"){ @Override
public void run(){
//todo
}}
Ex.1
MultiThreadDemo:
/**
* 多线程并发处理demo
* @author daniel.zhao
*
*/public class MultiThreadDemo implements Runnable { private MultiThreadProcessService multiThreadProcessService; public MultiThreadDemo() {
} public MultiThreadDemo(MultiThreadProcessService multiThreadProcessService) { this.multiThreadProcessService = multiThreadProcessService;
} @Override
public void run() {
multiThreadProcessService.processSomething();
}}
测试类
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(classes = { MultiThreadConfig.class })public class MultiThreadTest { @Autowired
private ThreadPoolTaskExecutor taskExecutor; @Autowired
private MultiThreadProcessService multiThreadProcessService; @Test
public void test() { int n = 20; for (int i = 0; i < n; i++) {
taskExecutor.execute(new MultiThreadDemo(multiThreadProcessService));
System.out.println("int i is " + i + ", now threadpool active threads totalnum is " + taskExecutor.getActiveCount());
} try {
System.in.read();
} catch (IOException e) { throw new RuntimeException(e);
}
}
}
提交任务
无返回值的任务使用execute(Runnable)
有返回值的任务使用submit(Runnable)
处理流程
当一个任务被提交到线程池时,首先查看线程池的核心线程是否都在执行任务,否就选择一条线程执行任务,是就执行第二步。 查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第三步。 查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第四步。 查看线程池是否已满,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。
在ThreadPoolExecutor中表现为
如果当前运行的线程数小于corePoolSize,那么就创建线程来执行任务(执行时需要获取全局锁)。 如果运行的线程大于或等于corePoolSize,那么就把task加入BlockQueue。 如果创建的线程数量大于BlockQueue的最大容量,那么创建新线程来执行该任务。 如果创建线程导致当前运行的线程数超过maximumPoolSize,就根据饱和策略来拒绝该任务。
关闭线程池
调用shutdown或者shutdownNow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于:
shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。
shutdown只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完。
线程个数配置
线程池状态检控
taskCount:线程需要执行的任务个数。completedTaskCount:线程池在运行过程中已完成的任务数。largestPoolSize:线程池曾经创建过的最大线程数量。getPoolSize获取当前线程池的线程数量。getActiveCount:获取活动的线程的数量
通过继承线程池,重写beforeExecute
,afterExecute
和terminated
方法来在线程执行任务前,线程执行任务结束,和线程终结前获取线程的运行情况,根据具体情况调整线程池的线程数量。
ThreadPoolTaskExecutor的参数
int corePoolSize: 线程池维护线程的最小数量.int maximumPoolSize: 线程池维护线程的最大数量.long keepAliveTime: 空闲线程的存活时间.TimeUnit unit: 时间单位,现有纳秒,微秒,毫秒,秒枚举值.BlockingQueue<Runnable> workQueue: 持有等待执行的任务队列.RejectedExecutionHandler handler: 用来拒绝一个任务的执行,有两种情况会发生这种情况。
一是在execute方法中若addIfUnderMaximumPoolSize(command)为false,即线程池已经饱和;
二是在execute方法中, 发现runState!=RUNNING || poolSize == 0,即已经shutdown,就调用ensureQueuedTaskHandled(Runnable command),在该方法中有可能调用reject。
ThreadPoolExecutor
@Overridepublic void execute(Runnable task) {
Executor executor = getThreadPoolExecutor(); try {
executor.execute(task);
} catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
依赖于ThreadPoolExecutor
的处理流程:
1)当池子大小小于corePoolSize就新建线程,并处理请求 2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁其会优先创建 CorePoolSize 线程, 当继续增加线程时,先放入Queue中,当 CorePoolSize 和 Queue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出错 误 org.springframework.core.task.TaskRejectedException另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。