前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >tomcat的线程池机制

tomcat的线程池机制

作者头像
简单的程序员
修改2020-05-21 21:25:24
3K2
修改2020-05-21 21:25:24
举报
文章被收录于专栏:奕仁专栏奕仁专栏

剖析tomcat线程池的源码,本文以源码来解析tomcat的线程池使用策略

查找来源

首先先在tomcat官网找到对应的tomcat线程池配置,具体定位在:Tomcat线程池

然后对其配置的默认参数进行解释:

threadPriority :优先级,默认是Normal

daemon :是否守护线程,默认是true

namePrefix:线程名字:tomcat-exc-1+

maxThreads:最大线程:默认 200

minSpareThreads :最小在线线程:默认25

maxIdleTime :最大在线时间:默认60s(线程执行完成之后60s就会被shutdown)

maxQueueSize:队列的最大值:Integer的最大值> Integer.MAX_VALUE

prestartminSpareThreads :是否在启动的时候占用最小在线线程:默认 false(如果为true,即在启动tomcat的时候就会启动minSpareThreads个线程)

threadRenewalDelay:重建线程池内的线程:默认值为1000(为了避免线程同时重建,每隔threadRenewalDelay(单位: ms )重建一个线程)

好了,看完这些参数之后,来进行分析tomcat启动线程的源码,这里以springboot内置的tomcat源码分析为主~

首先找到这个类,这个类即为启动的核心

代码语言:java
复制
org.apache.catalina.core.StandardThreadExecutor

分析源码

代码语言:java
复制
protected void startInternal() throws LifecycleException {
  //创建一个任务队列  容量为Int的最大值
	taskqueue = new TaskQueue(maxQueueSize);
	//创建线程factory  
	TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
	//创建一个线程池    核心线程25个  最大200个  存活60s
	executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
   //设置重建线程时间  1s
   executor.setThreadRenewalDelay(threadRenewalDelay);
   //是否开启占用核心线程,默认关闭
	if (prestartminSpareThreads) {
	//如果开启  见下面的代码@1.1
		executor.prestartAllCoreThreads();
	}
	//@1.0 设置parent,关联线程池对象
	taskqueue.setParent(executor); 
	setState(LifecycleState.STARTING);
}
代码语言:java
复制
 @1.1
public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

继续跟源码:

当有一个请求执行自然也是会执行execute方法 找到StandardThreadExecutor#execute

代码语言:java
复制
public void execute(Runnable command, long timeout, TimeUnit unit) {
        if (executor != null) {
			//执行线程操作
            executor.execute(command,timeout,unit);
        } else {
            throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
        }
    }

它这个是将jdk的execute二次封装了一下,我们不用管,继续跟源码……

代码语言:java
复制
public void execute(Runnable command, long timeout, TimeUnit unit) {
	//这个字段是记录的线程提交的数量,如果线程执行完毕,这个字段会减1 等下会用到 
	submittedCount.incrementAndGet();
	try {
	//执行jdk的线程池代码
		super.execute(command);
	} catch (RejectedExecutionException rx) {
	//拒绝策略,发生的情况是线程超过最大值(maxThreads),并且队列也已经满了,也就是(Integer.MAX+maxThreads)
		if (super.getQueue() instanceof TaskQueue) {
			final TaskQueue queue = (TaskQueue)super.getQueue();
			try {
			//这里其实就是讲这个线程(60s后)丢到队列里面,如果60s后队列还是满的,那就没办法了,抛异常……
				if (!queue.force(command, timeout, unit)) {
					submittedCount.decrementAndGet();
					throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
				}
			} catch (InterruptedException x) {
				submittedCount.decrementAndGet();
				throw new RejectedExecutionException(x);
			}
		} else {
			submittedCount.decrementAndGet();
			throw rx;
		} 
	}
}
代码语言:java
复制
@1.2
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet. 
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
大概意思是任务提交了但是没有执行结束,统计的是队列里的任务和已经在执行但是还没有执行完的任务

分析线程池源码:

(其实前面已经分析过了:线程池原理分析

代码语言:java
复制
// ctl 中保存的线程池当前的一些状态信息
	int c = ctl.get();

	//  下面会涉及到 3 步 操作
	// 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
	// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
	if (workerCountOf(c) < corePoolSize) {
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
	// 2.注意这里是重点!!!
	// 如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
	// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
	//断点达到workQueue,发现workQueue对象是我们tomcat启动类的TaskQueue对象,那么我们源码打到taskQueue的offer方法
	//@1.3 workQueue.offer(command)如果返回false 则会执行@1.4操作

	if (isRunning(c) && workQueue.offer(command)) {
		int recheck = ctl.get();
		// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
		if (!isRunning(recheck) && remove(command))
			reject(command);
	   // 如果当前线程池为空就新创建一个线程并执行。 
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
	//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
	@1.4 
	else if (!addWorker(command, false))
		reject(command);
}

代码语言:java
复制
public boolean offer(Runnable o) {
//we can't do any checks >>> 如果parent为null直接丢到队列里面 当然,parent肯定不会为null,本页面搜@1.0 
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object>>>如果工作线程数量等于最大线程数量>>>丢到队列里面
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue>>>如果队列里的任务加正在执行的任务(见@1.2) 小于等于正在执行的任务,丢到队列里面
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread >>> 如果正在执行的任务小于最大任务数量,返回false???
//这里看看,返回false是什么意思呢?见@1.3
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}

因此,这里我们就分析完了,原来他是这么搞的,再用一张流程图来解释一下线程池中的队列执行流程

声明:本图来源 https://mp.weixin.qq.com/s/n28nH8xL6dTlw_vCi0wI6w
声明:本图来源 https://mp.weixin.qq.com/s/n28nH8xL6dTlw_vCi0wI6w

拒绝策略: 具体自己写个demo(工作线程大于将队列的容量+最大线程数量,则会触发拒绝策略)

dubbo线程池的拒绝策略

最后:看一下dubbo的拒绝策略: 类:

代码语言:java
复制
org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor

跟源码…… 类名一样,还是TaskQueue,不过它是org.apache.dubbo.common.threadpool.support.eager.TaskQueue

代码语言:java
复制
public boolean offer(Runnable runnable) {
	if (executor == null) {
		throw new RejectedExecutionException("The task queue does not have executor!");
	}
//当前工作线程大小
	int currentPoolThreadSize = executor.getPoolSize();
	// have free worker. put task into queue to let the worker deal with task.>>已经提交的线程如果小于工作线程 丢到队列里面
	if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
		return super.offer(runnable);
	}

	// return false to let executor create new worker.>> 当前线程小于最大线程容量 可以进行创建线程并执行
	if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
		return false;
	}

	// currentPoolThreadSize >= max
	return super.offer(runnable);
}

然后再看一下它的拒绝策略,具体在下面那行代码

代码语言:java
复制
queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)

/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
	throw new RejectedExecutionException("Executor is shutdown!");
}
//不就是立马丢到队列里面???感觉又包装了一层……可能它想的是公用性吧
return super.offer(o, timeout, unit);
}

代码语言:java
复制
//dubbo的线程池拒绝策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	String msg = String.format("Thread pool is EXHAUSTED!" +
					" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
					" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
			threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
			e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
			url.getProtocol(), url.getIp(), url.getPort());
	logger.warn(msg);
//dump栈信息
  dumpJStack();

	throw new RejectedExecutionException(msg);
}

private void dumpJStack() {
	long now = System.currentTimeMillis();

	//dump every 10 minutes
	if (now - lastPrintTime < 10 * 60 * 1000) {
		return;
	} 
	if (!guard.tryAcquire()) {
		return;
	}
	//开启一个新的线程将jstack信息dump下来,并保存到服务器上
	ExecutorService pool = Executors.newSingleThreadExecutor();
	pool.execute(() -> {
		String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home")); 
		SimpleDateFormat sdf; 
		String os = System.getProperty("os.name").toLowerCase(); 
		// window system don't support ":" in file name
		if (os.contains("win")) {
			sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
		} else {
			sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
		} 
		String dateStr = sdf.format(new Date());
		//try-with-resources
		//栈文件日志名为……
		try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
			JVMUtil.jstack(jStackStream);
		} catch (Throwable t) {
			logger.error("dump jStack error", t);
		} finally {
			guard.release();
		}
		lastPrintTime = System.currentTimeMillis();
	});
	//must shutdown thread pool ,if not will lead to OOM
	//这里使用完线程池之后一定要shutdown,不然会oom
	pool.shutdown(); 
}

这里的话tomcat线程池的源码和拒绝策略就解释完了

结尾

面试题:

tomcat线程池是怎样执行的?拒绝策略是如何?

看完这个源码后,balabala……其实我们也可以先把最大线程数用完,然后再让任务进入队列。通过自定义队列,重写其 offer 方法就可以实现。目前我知道的 Tomcat 和 Dubbo 都提供了这样策略的线程池,扯一堆。 面试官:好了,我们聊聊其他的吧!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 查找来源
  • 分析源码
  • 分析线程池源码:
  • dubbo线程池的拒绝策略
  • 结尾
    • 面试题:
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档