前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一步一步分析RejectedExecutionException异常

一步一步分析RejectedExecutionException异常

作者头像
九州暮云
发布2019-08-21 14:19:08
5.1K0
发布2019-08-21 14:19:08
举报
文章被收录于专栏:九州牧云九州牧云

问题的产生

最近在开发批量发送动态push功能的时候,用了线程池提高批量发送的效率,并在线程池任务全部执行完毕后,更新MySQL数据库里接收人的状态信息为已接收状态。主要的业务代码如下:

代码语言:javascript
复制
	   // 查询今天接收动态push用户的数据
		String createTime = DateUtils.formatDate(new Date(), DateUtils.DAY_FROMAT);
		List<MomentsPush> momentsPushs = this.momentsPushDao.getMomentsPushListByCreateTime(createTime);
		// 使用线程池发送push,MOMENTS_EXECUTORS是一个public static final的全局静态的固定线程池
		for (MomentsPush momentsPush : momentsPushs) {
			MOMENTS_EXECUTORS.execute(new Runnable() {
				//发送逻辑
			}
		}
		// 发出关闭线程池信号,等待任务完成后更新接收人状态
	   exec.shutdown();
	   while(true){ 
	       if(exec.isTerminated()){  
	            //更新接收人状态逻辑
	            break;  
	       }  
	       Thread.sleep(1000);    
	    }

代码写好后,我部署到了内网测试,第一次批量发送正常,但是在执行第二次批量发送的时候,遇到了以下异常信息:

代码语言:javascript
复制
java.util.concurrent.RejectedExecutionException: Task com.baihe.push.service.impl.MomentsPushServiceImpl$2@6f55c921 
rejected from java.util.concurrent.ThreadPoolExecutor@6307ca2e[Terminated, pool size = 0, active threads = 0, queued
 tasks = 0, completed tasks = 111401]

Google了之后,发现是我调用了exec.shutdown()方法之后又提交新任务给线程池导致的。最后我改变了代码逻辑,没有调用exec.shutdown()exec.isTerminated()方法来关闭和判断线程池的状态。接下来将这个问题的解决方式总结一下。

入门示例

下面的测试程序使用ThreadPoolExecutor类来创建线程池执行任务,代表任务Worker类代码如下:

代码语言:javascript
复制
package com.concurrent.rejectedexecutionexception;

public class Worker implements Runnable {

	private int ID;

	public Worker(int id) {
		// TODO Auto-generated constructor stub
		this.ID = id;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			Thread curThread = Thread.currentThread();
			System.out.println(curThread.getName() + " 执行任务 " + ID);
			Thread.sleep(500);
			System.out.println(curThread.getName() + " 完成任务 " + ID);
		} catch (Exception e) {
			System.out.println(e);
		}
	}

	public int getID() {
		return ID;
	}

	public void setID(int iD) {
		ID = iD;
	}
}

执行Worker任务的代码如下:

代码语言:javascript
复制
package com.concurrent.rejectedexecutionexception;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectedExecutionExceptionExample {

	public static void main(String[] args) {

		ExecutorService executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(15));

		Worker tasks[] = new Worker[10];
		for (int i = 0; i < 10; i++) {
			tasks[i] = new Worker(i);
			executor.execute(tasks[i]);
		}
		executor.shutdown();
	}
}

运行一下,看到如下输出:

代码语言:javascript
复制
pool-1-thread-1 执行任务 0
pool-1-thread-2 执行任务 1
pool-1-thread-3 执行任务 2
pool-1-thread-1 完成任务 0
pool-1-thread-1 执行任务 3
pool-1-thread-3 完成任务 2
pool-1-thread-2 完成任务 1
pool-1-thread-3 执行任务 4
pool-1-thread-2 执行任务 5
pool-1-thread-1 完成任务 3
pool-1-thread-1 执行任务 6
pool-1-thread-2 完成任务 5
pool-1-thread-2 执行任务 7
pool-1-thread-3 完成任务 4
pool-1-thread-3 执行任务 8
pool-1-thread-1 完成任务 6
pool-1-thread-1 执行任务 9
pool-1-thread-3 完成任务 8
pool-1-thread-2 完成任务 7
pool-1-thread-1 完成任务 9

RejectedExecutionExceptionExample类里,我们使用ThreadPoolExecutor类创建了一个数量为3的线程池来执行任务,在这3个线程执行任务被占用期间,如果有新任务提交给线程池,那么这些新任务会被保存在BlockingQueue阻塞队列里,以等待被空闲线程取出并执行。在这里我们使用一个大小为15的ArrayBlockingQueue队列来保存待执行的任务(原因接下来会说明),然后我们创建了10个任务提交给ThreadPoolExecutor线程池。

异常场景1

产生RejectedExecutionException异常的第一个原因:

调用shutdown()方法关闭了ThreadPoolExecutor线程池,又提交新任务给ThreadPoolExecutor线程池执行。一般调用shutdown()方法之后,JVM会得到一个关闭线程池的信号,并不会立即关闭线程池,原来线程池里未执行完的任务仍然在执行,等到任务都执行完后才关闭线程池,但是JVM不允许再提交新任务给线程池。

让我们用以下例子来重现该异常:

代码语言:javascript
复制
package com.concurrent.rejectedexecutionexception;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectedExecutionExceptionExample {

	public static void main(String[] args) {

		ExecutorService executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(15));

		Worker tasks[] = new Worker[10];
		for (int i = 0; i < 10; i++) {
			tasks[i] = new Worker(i);
			executor.execute(tasks[i]);
		}
		executor.shutdown();// 关闭线程池
		executor.execute(tasks[0]);// 关闭线程池之后提交新任务,运行之后抛异常
	}
}

运行一下,看到如下输出:

代码语言:javascript
复制
pool-1-thread-1 执行任务 0
pool-1-thread-3 执行任务 2
pool-1-thread-2 执行任务 1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.concurrent.rejectedexecutionexception.Worker@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Shutting down, pool size = 3, active threads = 3, queued tasks = 7, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.concurrent.rejectedexecutionexception.RejectedExecutionExceptionExample.main(RejectedExecutionExceptionExample.java:20)
pool-1-thread-1 完成任务 0
pool-1-thread-2 完成任务 1
pool-1-thread-3 完成任务 2
pool-1-thread-1 执行任务 4
pool-1-thread-2 执行任务 3
pool-1-thread-3 执行任务 5
pool-1-thread-3 完成任务 5
pool-1-thread-2 完成任务 3
pool-1-thread-1 完成任务 4
pool-1-thread-2 执行任务 7
pool-1-thread-3 执行任务 6
pool-1-thread-1 执行任务 8
pool-1-thread-2 完成任务 7
pool-1-thread-2 执行任务 9
pool-1-thread-1 完成任务 8
pool-1-thread-3 完成任务 6
pool-1-thread-2 完成任务 9

从以上例子可以看出,在调用shutdown()方法之后,由于JVM不允许再提交新任务给线程池,于是抛出了RejectedExecutionException异常。

异常场景2

产生RejectedExecutionException异常第二个原因:

要提交给阻塞队列的任务超出了该队列的最大容量。当线程池里的线程都繁忙的时候,新任务会被提交给阻塞队列保存,这个阻塞队列一旦饱和,线程池就会拒绝接收新任务,随即抛出异常。

示例代码如下:

代码语言:javascript
复制
package com.concurrent.rejectedexecutionexception;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectedExecutionExceptionExample {

	public static void main(String[] args) {
		
		// 用一个大小为15的ArrayBlockingQueue保存待执行的任务
		ExecutorService executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(15));

		// 提交给线程池20个任务
		Worker tasks[] = new Worker[20];
		for (int i = 0; i < 20; i++) {
			tasks[i] = new Worker(i);
			executor.execute(tasks[i]);
		}
		executor.shutdown();// 关闭线程池
		executor.execute(tasks[0]);// 关闭线程池之后提交新任务,运行之后抛异常
	}
}

在上面的例子中,我们使用了一个大小为15的ArrayBlockingQueue阻塞队列来保存等待执行的任务。接着我们提交了20个任务给线程池,由于每个线程执行任务的时候会睡眠0.5秒,因此当3个线程繁忙的时候,其他任务不会立即得到执行,我们提交的新任务会被保存在队列里。当等待任务的数量超过线程池阻塞队列的最大容量时,抛出了RejectedExecutionException异常。

如何解决

要解决RejectedExecutionException异常,首先我们要注意两种情况:

  • 当调用了线程池的shutdown()方法以后,不要提交新任务给线程池
  • 不要提交大量超过线程池处理能力的任务,这时可能会导致队列饱和,抛出异常

对于第二种情况,我们很容易解决。我们可以选择一种不需要设置大小限制的数据结构,比如LinkedBlockingQueue阻塞队列。因此在使用LinkedBlockingQueue队列以后,如果还出现RejectedExecutionException异常,就要将问题的重点放在第一种情况上。如果第一种情况不是产生问题的原因,那么我们还需要寻找更复杂的原因。比如,由于线程死锁和LinkedBlockingQueue饱和,导致内存占用过大,这个时候我们就需要考虑JVM可用内存的问题了。

对于第二种情况,通常有一些隐藏的信息被我们忽略。其实我们可以给使用ArrayBlockingQueue作为阻塞队列的ThreadPoolExecutor线程池提交超过15个的任务,只要我们在提交新任务前设置一个完成原来任务的等待时间,这时3个线程就会逐渐消费ArrayBlockingQueue阻塞队列里的任务,而不会使它堵塞。示例如下:

代码语言:javascript
复制
package com.concurrent.rejectedexecutionexception;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectedExecutionExceptionExample {

	public static void main(String[] args) throws InterruptedException {

		// 用一个大小为15的ArrayBlockingQueue保存待执行的任务
		ExecutorService executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(15));

		// 提交给线程池20个任务
		Worker tasks[] = new Worker[20];
		for (int i = 0; i < 10; i++) {
			tasks[i] = new Worker(i);
			executor.execute(tasks[i]);
		}
		Thread.sleep(3000);// 让主线程睡眠三秒
		for (int i = 10; i < 20; i++) {
			tasks[i] = new Worker(i);
			executor.execute(tasks[i]);
		}
		executor.shutdown();// 关闭线程池
	}
}

运行一下,看到如下输出:

代码语言:javascript
复制
pool-1-thread-1 执行任务 0
pool-1-thread-3 执行任务 2
pool-1-thread-2 执行任务 1
pool-1-thread-1 完成任务 0
pool-1-thread-3 完成任务 2
pool-1-thread-2 完成任务 1
pool-1-thread-1 执行任务 3
pool-1-thread-3 执行任务 4
pool-1-thread-2 执行任务 5
pool-1-thread-3 完成任务 4
pool-1-thread-3 执行任务 6
pool-1-thread-2 完成任务 5
pool-1-thread-2 执行任务 7
pool-1-thread-1 完成任务 3
pool-1-thread-1 执行任务 8
pool-1-thread-2 完成任务 7
pool-1-thread-2 执行任务 9
pool-1-thread-1 完成任务 8
pool-1-thread-3 完成任务 6
pool-1-thread-2 完成任务 9
pool-1-thread-1 执行任务 10
pool-1-thread-2 执行任务 12
pool-1-thread-3 执行任务 11
pool-1-thread-2 完成任务 12
pool-1-thread-2 执行任务 13
pool-1-thread-3 完成任务 11
pool-1-thread-3 执行任务 14
pool-1-thread-1 完成任务 10
pool-1-thread-1 执行任务 15
pool-1-thread-2 完成任务 13
pool-1-thread-1 完成任务 15
pool-1-thread-1 执行任务 17
pool-1-thread-3 完成任务 14
pool-1-thread-3 执行任务 18
pool-1-thread-2 执行任务 16
pool-1-thread-1 完成任务 17
pool-1-thread-2 完成任务 16
pool-1-thread-3 完成任务 18
pool-1-thread-1 执行任务 19
pool-1-thread-1 完成任务 19

当然上面这种设置等待时间来分隔旧任务和新任务的方式,在高并发情况下效率并不高,一方面由于我们无法准确预估等待时间,一方面由于ArrayBlockingQueue内部只使用了一个锁来隔离读和写的操作,因此效率没有使用了两个锁来隔离读写操作的LinkedBlockingQueue高,故而不推荐使用这种方式。

参考链接:java.util.concurrent.RejectedExecutionException – How to solve RejectedExecutionException

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题的产生
  • 入门示例
  • 异常场景1
  • 异常场景2
  • 如何解决
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档