前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池

线程池

作者头像
Joseph_青椒
修改2023-08-08 19:01:48
6090
修改2023-08-08 19:01:48
举报
文章被收录于专栏:java_joseph

来,随我吃透线程池!!!

线程池的作用

线程的创建和销毁的开销是非常大的,线程创建,直接依靠操作系统。大量的线程的创建,会给操作系统和jvm虚拟机带来压力,同时,大量的销毁也会给垃圾回收器带来压力

所以,线程池的目的就是为了解决两个问题

1反复创建线程开销大

2过多的线程太多占用内存

线程池:通过少量线程的复用,成功的解决了这个问题

线程池的好处:

加快响应速度,合理利用cpu与内存,统一管理线程这些线程。

适用线程池的场合

1服务器,服务器要收到大量请求,比如tomcat服务器,也是用线程池实现的

2开发中,5个以上的线程,就可用用线程池了

线程池的创建

  • 核心参数配置说明

参数

说明

corePoolSize

线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活

maximumPoolSize

最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务

keepAliveTime

当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize

TimeUnit

时间单位

workQueue

缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行

threadFactory

线程创建的工厂,一般用默认的 Executors.defaultThreadFactory()

handler

当pool已经达到max size的时候,如何处理新任务

这里我们先说一下线程创建的参数,

corePoolsize maxNumPoolSize worekQueue

线程创建,会先创建corePoolsize的线程数量,比如5,当阻塞队列满的时候,比如100,此时线程数量会变大,还处理不过来,就会到maxNumPoolSize最大线程数量,

注意点:弱corePoolSize与maxNumPoolSize保持一致,线程池的大小就是固定的

线程池期望保留线程更少一点,通过corePoolSize在阻塞队列满的时候,才会增加,可以看出来

两个特殊的场景,当阻塞队列大小设置成Integer.MaxValue,那么此时线程数量将不会大于corePoolSize

若maxNumPoolSize设置成Integer.MaxValue,那么此时策略是:允许线程池的容量无限扩大!

keepAliveTime,这里就是多余corePoolSize,大于核心线程数的数量的线程,空闲时间到达这个值 就会被回收

image-20230726145156983
image-20230726145156983

线程工厂,ThreadFactory这个参数直到是创建线程的就可以了,源码这里也无非是new Thread

主要再将一下

工作队列,worKQueue

1)直接交接,SynchronousQueue

2)无界队列:LinkedBlockingQueue

当任务太多,处理不过来,就会导致OOM,内存溢出,而且会让任务丢失

这个队列maxPoolSize是无意义的,因为这个队列满不了

3)有界队列:ArrayBlockingQueue

这个队列是可以满的,maxPoolsize有意义

手动创建or自动创建

有人可能看到过alibaba编码规范

image-20230726145930194
image-20230726145930194

自动创建的话,juc提供了

    • JUC包下的Executors工具类提供多种线程池

    线程池名称 说明 newFixedThreadPool 一个定长线程池,可控制线程最大并发数 newCachedThreadPool 一个可缓存线程池, newSingleThreadExecutor 一个单线程化的线程池,用唯一的工作线程来执行任务 newScheduledThreadPool 一个定长线程池,支持定时/周期性任务执行

newFixedThreadPool

看名字就能想到,线程池的数量是固定的,那么通过上面的分析,能猜到,核心线程数corePoolSize的数量和maxNumPoolSize最大线程数应该是一致的,

代码语言:javascript
复制
/**
 * @Author:Joseph
 * @bolg:https://li-huancheng.gitee.io/
 * @Package:threadPool
 * @Project:bing-fa-demo
 * @name:ExecutorDemo
 * @Date:2023-07-26 15:03
 * @Filename:ExecutorDemo
 */
public class ExecutorDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
class Task implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}
image-20230726152736496
image-20230726152736496

通过控制台,可以看出,就是编码时候 的4个线程数

代码语言:javascript
复制
*/
   public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }

构造函数中,核心线程数,最大线程数,都是传入的参数nThreads

线程存货时间keepAliveTime是0L,后面的单位就不说了,这个0L,就是,根本没有非核心线程的线程,所以这个参数没什么意义

LinkedBlockQueue,这个工作队列是无限长的,所以当任务量过大的时候,都堆积再linkedBlokQueue阻塞队列中,太多了,内存扛不住,就会出现OOM

代码语言:javascript
复制
package threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author:Joseph
 * @bolg:https://li-huancheng.gitee.io/
 * @Package:threadPool
 * @Project:bing-fa-demo
 * @name:FixedThreadPoolExecutorOOM
 * @Date:2023-07-26 15:34
 * @Filename:FixedThreadPoolExecutorOOM
 */
public class FixedThreadPoolExecutorOOM {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(new SubThread());
        }
    }

}
class SubThread implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(1000000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这是造成OOM的一个小demo,配置下jvm内存小一点就可以看到

newSingleThreadExecutor

这个就不掩饰了,就是暴漏的问题和上面一样,线程存货时间keepAliveLife是没意义的

然后核心线程数和最大线程数是1,阻塞队列采取的是无界队列LinkedBlokingQueue

这两个例子就是阿里巴巴编码规范中说的,大量请求堆积导致的OOM

再看下面两种,

先说下结果

下面两种是创建线程数量太多,大量的线程而导致的OOM

newCachedThreadPool

他的功能是缓存线程池,可回收多余的线程

这个通过上面参数的讲解,可以猜测到,工作队列一定不是无界队列LinkedBlockingQueue,不然线程数量是不会大于corePoolSize核心线程数的

他采用的是SynchronousQueue,直接交换队列,也就是说,这个队列存不了东西,任务直接走到线程去,然后线程不够的话,就直接创建

所以他会导致大量的线程导致oom,线程不用可回收,那么它的keepAliveTime一定配置了个值

现在看一下源码

代码语言:javascript
复制
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

核心线程为0,最大线程无限,keepAliveTime最大存活时间60s,队列采用直接交换队列,队列不缓冲任务

和上面的分析一样

newScheduledThreadPool

支持定时,周期型的执行任务

代码语言:javascript
复制
**
 * @Author:Joseph
 * @bolg:https://li-huancheng.gitee.io/
 * @Package:threadPool
 * @Project:bing-fa-demo
 * @name:ScheduledThreadPool
 * @Date:2023-07-26 15:55
 * @Filename:ScheduledThreadPool
 */
public class ScheduledThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        //4s后再执行
        scheduledExecutorService.schedule(new Task(),4, TimeUnit.SECONDS);
        //以一定频率重复运行,最开始等1s钟,后面每3s的执行
        scheduledExecutorService.scheduleAtFixedRate(new Task1(),1,3,TimeUnit.SECONDS);
    }
}
class Task1 implements Runnable{
    @Override
    public void run() {
        System.out.println("test");
    }
}

这里写了下常见 的使用方法

这个既然有线程过多的情况,那么它的maxNumPoolSize最大线程数一定是Integer.MaxValue

代码语言:javascript
复制
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

这里的话,有核心线程数,但是最大线程数是MaxValue,有配置活时间,主要是采用的是延迟工作队列。

这里又多看到一个工作队列

一共有

SynchronousQueue直接交接队列,特点是长度为0

LinkedBlockingQueue无界队列,特点是无界

ArrayBlockingQueue有界队列,特点是有范围

DelayedWorkQueue()延时工作队列,特点是能延迟处理任务

手动创建

看见,jdk提供的,都有bug,都会导致OOM,所以业务使用要自己配置

那么线程池的数量设置成多少何时?

cpu密集型与IO密集型

业务时加密,计算hash等,cpu密集型的话,就可以设置大小为cpu的1~2倍,8核cpu的话,就可以将核心线程数设置为8-16

数据库的读写,文件,网络id这样的,io密集型的话,因为cpu的速度比io快,所以可以将核心线程数设置的多一些,10倍也可以的,

因为80个很多都是在等待io的,所以这样的话才能更好的利用cpu

Brain Goetz大佬就给除了一个公式

线程数= cpu核心数*(1+平均等待时间/平均工作时间),当然真实使用的话,还是压测真实环境。估测的话,这个公式就可以的

jdk常见的线程池

通过上面的选择手动创建or自动创建

我们来总结一下jdk提供的线程池

1fixedThreadPool,这种线程池coro和max线程数一致,固定下线程

2cacheThreadPool,这种就是可缓存线程,线程会自动回收

3ScheduleThreadPool.定期执行任务,定时的执行

4singleThreadThreadExcecutor,就是一个线程,

上面我已经讲的很详细了,只需要注意,

1,4会因任务在工作队列的堆积造成OOM

这两个队列的选择就是为了满足需求嘛,线程固定了,那么任务只能通过队列来堆积,所以采用LinedBlocking’queue

2\4会因为线程数量的过大,导致OOM

cachedThreadPool选择SynchronousQueue目的就是全给线程,不需要在队列中转,提高效率

secheduledThread这个的队列就不用说了吧,延时功能

jdk8新的线程池

workStealingPool

特点:子任务 窃取

这里这样理解,三个线程,各个有自己的队列,他们也有公共的队列,

第一个线程自己创建了3个子任务,执行,另外两个线程会帮线程1去执行

这样使用有两个注意,适用不加锁的场景,这样才能让别的线程帮忙执行 ,另外就是执行的顺序不保证,因为会窃取嘛

场景:比较少,

线程池的关闭

线程池的关闭也是有些讲究的!

shutDown

发出线程终止的命令,但是并非是马上的关闭,会等队列中的任务全搞完,就关闭,

同时不会去接受新的任务

isShutDown

判断是否shutDown

isTerminated

这个是判断是否真的关闭了,因为执行shutDown要等待全关掉

awaitTermination

等待一段时间后来判断是否真的关闭

shutDownNow

这个命令会马上关闭,正在执行的线程会中断,在队列中为被消费的,会返回

代码语言:javascript
复制
/**
 * @Author:Joseph
 * @bolg:https://li-huancheng.gitee.io/
 * @Package:threadPool
 * @Project:bing-fa-demo
 * @name:ShutDown
 * @Date:2023-07-28 15:59
 * @Filename:ShutDown
 */
public class ShutDown {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        executorService.shutdown();
    }
}
class ShutDownTask implements Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这里就比较简单了,大家自己测试,不是本文的重点

拒接策略

这里回顾一下重要的参数

最大线程数,核心线程数,最大存活时间,工作队列,没错,还有一个就是现在要讲的拒绝策略

拒绝时机

1,当Executior执行shutDown命令,就不会再接受新的任务了

2、工作队列长度,或者最大线程数饱和

四种拒绝策略

AbortPolicy

直接抛出异常

DiscardPolicy

丢弃新的任务,我们是不知道的!

DiscardPolicyOldestPolicy

这个是丢掉最老的任务。

CallerRunsPolicy

让提交任务的线程去执行,比如异步功能,想要让线程池去做,但是线程池饱和了,线程池说,我不做,你自己做,这个就是CallerRunsPolicy,这个就是不像被压榨的员工哈哈

钩子函数

我们想在每个任务执行前后做一些日志统计,等任务,比如暂停,实现一个暂停的线程池

代码语言:javascript
复制
package threadPool;

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 延时每个任务执行的前后都可以放钩子函数
 * @Author:Joseph
 * @bolg:https://li-huancheng.gitee.io/
 * @Package:threadPool
 * @Project:bing-fa-demo
 * @name:PauseableThreadPool
 * @Date:2023-07-28 16:31
 * @Filename:PauseableThreadPool
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private final ReentrantLock lock= new ReentrantLock();

    private Condition unpaused = lock.newCondition();

    private boolean isPaused;



    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try{
            while(isPaused){
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    private void pause(){
        lock.lock();
        try{
            isPaused = true;
        }finally {
            lock.unlock();
        }
    }

    public void resume(){
        lock.lock();
        try{
            isPaused=false;
            unpaused.signalAll();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);


        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程池恢复了");

    }
}

线程池实现源码

上面了解了下线程池的使用注意点,现在看下线程池怎么实现的

线程池的组成部分

线程池管理器:创建、管理线程池

工作线程:就是线程池中存在的线程

任务队列:这个就是参数里重要之一的工作队列,因为并发嘛,多个线程去取线程,所以采用的阻塞队列blokingQueue

任务接口(Task):这个就是线程池要执行的一个一个的任务

Executor家族的区分

ThreadPoolExecutor、ExecutorService、Executor、Executors

image-20230728172158364
image-20230728172158364

这是他们的继承实现关系

Executor

代码语言:javascript
复制
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor是一个顶级的接口,只有这一个execute的方法

ExecutorService

代码语言:javascript
复制
public interface ExecutorService extends Executor {
	void shutdown();
	List<Runnable> shutdownNow();
	boolean isShutdown();
	boolean isTerminated();
	boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    Future<T> submit(Callable<T> task);
    Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}

ExecutorService继承了Executor,并新增了一些管理线程池的方法

Executors

一个工具类这里就不多说了,上面讲解的很透彻,只是调用创建线程池的构造函数,指定一些规则而已

ThreadPoolExecutor

这里就要着重讲一下了,这里他是ExecutorService的实现,返回值是ExecutorService

也是我们开发人员调用这个构造函数自定义线程池使用的,文章末尾会带大家实操一下

代码语言:javascript
复制
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
    private final Object poolSizeMonitor = new Object();
    private int corePoolSize = 1;
    private int maxPoolSize = 2147483647;
    private int keepAliveSeconds = 60;
    private int queueCapacity = 2147483647;
    private boolean allowCoreThreadTimeOut = false;

线程池如何实现线程的复用

这是ThreadPoolExecutor中额execute方法

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

代码语言:javascript
复制
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
    	//线程不够,增加线程
        if (workerCountOf(c) < corePoolSize) {
            //我们要看的就是wordker方法
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

好,看一下work中的runWorker方法

java.util.concurrent.ThreadPoolExecutor#runWorker

代码语言:javascript
复制
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
    	//注意这里的task,Runnable的run方法就是一个一个的类
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //主要看这里,如果task不为null,就去执行这里的逻辑,getTask就是从阻塞队列中拿出任务
            //while循环代表,这个work不会停止,执行完任务就会继续拿下一个任务去执行,这就实现了线程的复用
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        //这里就是调用runnable的run方法,
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

线程池的状态

状态

说明

RUNNING

运行状态,能接受新提交的任务,并且也能处理阻塞队列中的任务

SHUTDOWN

关闭状态,不再接受新提交的任务,可以继续处理阻塞队列中已保存的任务。

STOP

不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程

TIDYING

所有的任务都已终止了,workerCount (有效线程数)为0

TERMINATED

terminated() 方法执行完后进入该状态

通过上面线程关闭的学习,再结合这里看一下

shutDown会使Running状态到SHUTDOWN状态,线程池会拒绝新任务,但是还是会执行新的任务

shutDOwnNow让线程池从Running状态到STOP状态,此时拒绝新任务,同时也会中断正在执行的任务,同时会返回队列中的任务

当上面两种方法执行完,队列和工作线程都空时,会进入tidying状态,执行termined会进入TERMINATED关闭状态。

在这里站在上帝视角看下源码

无非就是

execute、addworker 、runworker、getTask方法

这里要先将一下,ctl这个

变量 ctl这个AtomicInteger包含两部分的信息,使用的是位运算的方式,相比于基本运算,速度快很多

  • 运行状态 (runState) 高3位保存
  • 线程池内有效线程的数量 (workerCount),低29位保存

下面是变量和比较重要的方法

代码语言:javascript
复制
public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc

     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     */
    //int类型的数字,高3位表示线程池状态,低29位表示worker数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //Integer.SIZE`为32,所以`COUNT_BITS`为29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池允许的最大线程数, 1左移29位,然后减1,即为 2^29 - 1
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    //获取线程池状态
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    //获取线程池worker数量
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    //根据线程池状态和线程池worker数量,生成ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }
代码语言:javascript
复制
//任务执行
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 32位,高3位存储线程池状态,低29位存储活跃线程数
       int c = ctl.get();
       //判断工作线程小于核心线程,则创建新线程,true表示是核心线程数
       if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //判断线程池是否运行,把任务放到队列里面去,返回boolean状态
        if (isRunning(c) && workQueue.offer(command)) {
            //再次获取值
            int recheck = ctl.get();
            //如果线程池已经终止,则移除任务,不在响应
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果没有线程,则创建一个空的worker,会从队列获取任务执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //队列满后,调用addWorker,创建非核心线程,参数是false,
        else if (!addWorker(command, false))
        //队列已满,创建非核心线程,失败则执行拒绝策略
            reject(command);
    }

//用于向线程池中添加一个新的工作线程。如果线程池中的线程数量已经达到maximumPoolSize,则返回false;
//如果线程池已经关闭,则返回false;否则,创建一个新的工作线程,并将其加入工作线程集合中
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                //判断线程数,根据传进来参数判断是创建线程数最大值
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                    //增加worker数量成功,返回到上面的retry
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                //新创建的worker,然后立刻启动,立刻执行任务(不是从队列中获取)
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }


//用于执行指定的工作线程,首先获取当前线程,然后不断从阻塞队列中取出任务并执行,直到从阻塞队列中取出null为止。
//在每次执行任务之前,会调用beforeExecute()方法和afterExecute()方法,这两个方法可以由子类进行重写,以实现一些特定的功能。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        
        //一直循环判断,当前任务是否有,没的话getTask()从队列中获取任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    
//用于从阻塞队列中获取一个任务,如果当前线程数小于corePoolSize,则会调用workQueue的take方法阻塞在当前
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();

            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // timed用于超时控制,当allowCoreThreadTimeOut是true或者活跃线程数大于核心线程数,则需要进行超时控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //take和poll都是从队列头部【拿出】一个元素,从头部得到并移除该元素
            //poll空队列的头部元素时返回null,不抛异常;而take方法对应获得空队列的头部元素时,会阻塞在获取的位置   
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

关于源码的解读,大家看注释就可以

我想着重点一下,这个阻塞队列,take方法,这里就体现阻塞队列的特性了,take方法,不会返回元素,而是阻塞,

还有就是work类中的works,本质是一个HashSet集合

image-20230728203619289
image-20230728203619289

再者需要注意runworker中执行前后的两个钩子

image-20230728203836873
image-20230728203836873

前面提到,核心线程数是不会消失的,但是,这里指的是永远是数量,HashSet这里并没有区分线程是先创建的还是后创建的,

所以core核心线程数为3,假设有6个,剩下来的3个不一定是最初创建的线程!

实战为王

这里讲一下工作中,如何创建线程池

先看一下,使用线程池的注意点

避免任务堆积

避免线程数过度增加

排查线程泄漏(线程回收不了的情况):一般是任务逻辑问题,导致任务结束不了,导致任务回收不了

如果有人问:工作中线程池是怎样做的?我认为可以这样回答

线程数的配置,要考虑io密集型和cpu密集型,io密集型可以设置2倍的核数,cpu密集型最好是和核数相等

阻塞队列的长度可以区分面向C端的快速响应还是面向B端的允许慢速处理的场景

C端的阻塞队列长度不能太长,另外C端,可以通过多节点,来增快消费速度,避免堆积,B端可以长一些

当然要结合业务,进行多接口压测,才能得到合理的数据。

这里列举两个场景。(以32核,64g为例)

C端:

代码语言:javascript
复制
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 32, 3,
       TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000),
        new ThreadPoolExecutor.DiscardPolicy());

这个C端的场景,就没有考虑上面说的,阻塞队列太长了,而核心线程数有过短,所以这里,就会出现问题,任务堆积而得不到消费

C端的场景不应该设置过长的策略,且要注意消费的速度要大一些

代码语言:javascript
复制
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,128, 60,
       TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
        new ThreadPoolExecutor.DiscardPolicy());

一台机器不够,那就多一些

B端

商家管理后台统计报表

代码语言:javascript
复制
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,1024, 60,
       TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
        new ThreadPoolExecutor.DiscardPolicy());

这里没有考虑B端场景,队列可以长一些,因为不要求实时性,这样配置,1024线程,导致OOM,服务器崩溃

代码语言:javascript
复制
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(32,124, 60,
       TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100000),
        new ThreadPoolExecutor.DiscardPolicy());

这样配置,让任务能过慢慢处理,同时队列又不是长的很过分,100万,并不会出现OOM的问题

另外

生产配置的时候,要线程池的隔离,不能让多个不同任务公用线程池,

异步提高qps

到这里,线程池基本就完结了

这里提供之前做项目的一个场景,这里用到了http连接池,和线程池,通过这个例子,我们看一下,工作中,如何用线程池,以及考虑引发的一些问题,

异步的使用场景以及一些注意事项

涉及网络通信,且有需要提高响应速度,就可以用异步,

MQ也属于一种异步,

适用于处理log,发送邮件,短信–等场景,涉及网络io调用,并不影响业务,因为异步是直接返回正常的,不知道后面是否成功

使用方式:启动类,开启异步@EnableAsync

异步失效场景:

异步采用动态代理,不能调用类本身的方法,加@Async,

  • 注解@Async的方法不是public方法
  • 注解@Async的返回值只能为void或者Future
  • 注解@Async方法使用static修饰也会失效

还要注意事物,@Tranctional与@Async会失效,但是,在调用异步的上游,就没事

线程池

当我们使用@Async的时候,没有自定义线程池,他会使用默认的线程池

默认8个核心线程数,核心线程处理不了,就会进入阻塞队列,阻塞队列是默认的Integer.MAX_VALUE,21亿!采用的是LinkedBlokingQueue,最大线程数也是21亿,但是根本不会大于8,因为阻塞队列满了才会到达,但是不可能满的

所以就要自定义线程池,

但是!

在spring中,自定义线程池要用ThreadPoolTaskExecutor,其实是一样的,只是框架多了层包装而已

我们只需要自定义线程池,覆盖spring默认的线程池,就能避免OOM的问题了

代码语言:javascript
复制
Configuration
@EnableAsync
public class ThreadPoolTaskConfig {

    @Bean("threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        //如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
        executor.setCorePoolSize(16);
//        executor.setAllowCoreThreadTimeOut(true);

        //缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
        executor.setQueueCapacity(1024);

        //最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
        //当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(64);
        //当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        //允许线程空闲时间60秒,当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);

        executor.setThreadNamePrefix("joseph的自定义线程池");


        //拒绝策略
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
        //AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        //DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        //DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        executor.initialize();

        return executor;

    }
}

好,我们异步的时候

直接指定线程池

@Async(“threadPoolTaskExecutor”)就可以了

好,那么当请求量很多的时候,就会引发问题

导致OOM,服务器重启,是会丢失的!,或者说,阻塞队列你设置的短一些的话,任务也是会丢失的(拒绝策略的话)

意思就是,我们不能单单的把任务堆到队列就好了,是会有消息丢失的风险的!

对的,就是提高线程处理任务的速度!

消费方角度提高

当涉及网络请求时,spring中使用的是restTempate

这是spring基于httpClient提供的一个http请求工具,我们异步发短信调用第三方服务,用到restTemplate,但是restTemplate在spring中有个坑,

  • 底层通过使用java.net包下的实现创建HTTP 请求
  • 通过使用ClientHttpRequestFactory指定不同的HTTP请求方式,主要提供了两种实现方式
    • SimpleClientHttpRequestFactory(默认)
      • 底层使用J2SE提供的方式,既java.net包提供的方式,创建底层的Http请求连接
      • 主要createRequest 方法( 断点调试),每次都会创建一个新的连接,每次都创建连接会造成极大的资源浪费,而且若连接不能及时释放,会因为无法建立新的连接导致后面的请求阻塞
    • HttpComponentsClientHttpRequestFactory
      • 底层使用HttpClient访问远程的Http服务

这就导致,每次请求都会三次握手,非常的耗时,

当请求处理不过来,客户端等待过长,主动断掉连接,就会报错

错误Caused by: java.io.IOException: Broken pipe

  • 服务端向前端socket连接管道写返回数据时 链接(pipe)却断开了

我们可以通过http连接池的方式,来服用建立 的连接,来加快线程处理的速度

进而加快阻塞队列的消费

对httpclient进行封装的有:Apache的Fluent、es的restHighLevelClient、spring的restTemplate等

这几个都可以进行http连接池的封装!

下面来看一下,如何封装

代码语言:javascript
复制
@Configuration
public class RestTemplateConfig {

    @Bean
    public RestTemplate restTemplate(ClientHttpRequestFactory requestFactory){
        return new RestTemplate(requestFactory);
    }

    @Bean
    public ClientHttpRequestFactory httpRequestFactory(){
        return new HttpComponentsClientHttpRequestFactory(httpClient());
    }

    @Bean
    public HttpClient httpClient(){
        Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
                .register("http", PlainConnectionSocketFactory.getSocketFactory())
                .register("https", SSLConnectionSocketFactory.getSocketFactory())
                .build();

        PoolingHttpClientConnectionManager connectManager = new PoolingHttpClientConnectionManager(registry);

        //设置连接池最大是500个链接
        connectManager.setMaxTotal(500);

        //maxPerToute对maxTotal细分 每个主机最大并发是300  route是指域名
        connectManager.setDefaultMaxPerRoute(300);

        RequestConfig requestConfig = RequestConfig.custom()
                //返回数据超时时间
                .setSocketTimeout(20000)
                //连胜服务器超时时间
                .setConnectTimeout(10000)
                //从连接池中获取连接的超时时间
                .setConnectionRequestTimeout(1000)
                .build();

        CloseableHttpClient closeableHttpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig)
                .setConnectionManager(connectManager)
                .build();
        return closeableHttpClient;
    }
    //优化前
//    @Bean
//    public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
//        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
//        factory.setReadTimeout(10000);
//        factory.setConnectTimeout(10000);
//        return factory;
//    }
}

现在,要涉及网络调用,异步提高性能的话

就没有任何问题了!

image-20230728224819957
image-20230728224819957

总结就是,先发现异步请求在压测的时候,会导致OOM,然后再自定义线程池,但是处理太慢,有超时报错和http管道报错,我们又更换httpClient,自定义http连接池,最终完成了qps的提高

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-07-28T,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 线程池的作用
  • 适用线程池的场合
  • 线程池的创建
    • corePoolsize maxNumPoolSize worekQueue
      • 工作队列,worKQueue
        • 手动创建or自动创建
          • newFixedThreadPool
          • newSingleThreadExecutor
          • newCachedThreadPool
          • newScheduledThreadPool
          • 手动创建
        • jdk常见的线程池
          • jdk8新的线程池
      • 线程池的关闭
      • 拒接策略
        • 拒绝时机
          • 四种拒绝策略
          • 钩子函数
          • 线程池实现源码
            • 线程池的组成部分
              • Executor家族的区分
                • Executor
                • ExecutorService
                • Executors
                • ThreadPoolExecutor
              • 线程池如何实现线程的复用
                • 线程池的状态
                • 实战为王
                • 异步提高qps
                  • 异步的使用场景以及一些注意事项
                    • 线程池
                      • 消费方角度提高
                      相关产品与服务
                      短信
                      腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档