前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java 线程池设计模式

java 线程池设计模式

作者头像
Tim在路上
发布2020-08-04 10:17:56
1.7K0
发布2020-08-04 10:17:56
举报

java 线程池采用的是 Thread Pool 线程池模式。

线程池设计模式主要解决在资源有限的情况下为每一个任务创建一个线程执行消耗资源很不现实。
线程池的设计思路

采用保存并管理一定数量的线程,用这些线程去执行不断产生的任务。

主要的类:

ThreadPool 负责接收和存储任务以及线程生命周期的管理。

WorkQueue 工作队列,实现任务的缓存。

WorkerThread 负责任务执行的工作线程。

所以说线程池一般用于单个任务处理时间短,但是任务量却非常大的场景。

什么是线程?

线程和进程都是对cpu工作时间段的描述

cpu在工作时会存在任务的切换。进程包括上下文切换。

线程是共享了进程的上下文环境,的更为细小的CPU时间段。

进程有独立的地址空间,一个进程崩溃后,在保护模式下不会对其它进程产生影响,而线程只是一个进程中的不同执行路径。线程有自己的堆栈和局部变量,但线程之间没有单独的地址空间,一个线程死掉就等于整个进程死掉,所以多进程的程序要比多线程的程序健壮,但在进程切换时,耗费资源较大,效率要差一些。

线程分为用户级线程和内核级线程,app自己管理是用户级线程,操作系统管理内核级线程

从java线程到linux线程

在java程序中创建 线程Thread ,会调用OS操作系统的库调度器陷入内核空间,创建一个内核级线程并维护在操作系统内核线程表内让调度程序进行调度。

cpu会给每一个线程分配一个执行时间,而线程栈中有程序计数器,寄存器,方法的栈帧,cpu在进行计算时计算的中间变量存储在寄存器里。

在线程切换的过程中,需要将线程的中间计算结果,存储在从寄存器中写回到内存线程PCB中保存现场,清空原有的寄存器,线程2再进行切换。线程2执行完再去唤醒线程1,需要将pcb中的中间结果写入寄存器。

线程切换消耗时间主要是?

  1. 用户态切换内核态,
  2. 需要保存上一个线程中现场。

线程池的创建参数

首先从ThreadPoolExecutor构造方法讲起,学习如何自定义ThreadFactory和RejectedExecutionHandler;

第1个参数: corePoolSize 表示常驻核心线程数 如果大于0,即使本地任务执行完毕,核心线程也不会被销毁.

第2个参数: maximumPoolSize 表示线程池能够容纳同时执行的最大线程

第3个参数: keepAliveTime 表示线程池中的线程空闲时间 当空闲时间达到keepAliveTime时,线程会被销毁,直到只剩下corePoolSize个线程 第5个参数: workQueue 表示缓存队列 当请求的线程数大于maximumPoolSize时,线程进入BlockingQueue

第7个参数: handler 表示执行拒绝策略的对象 当超过第5个参数workQueue的任务缓存区上限的时候,就可以通过该策略处理请求,这是一种简单的限流保护. 友好的拒绝策略可以是如下三种: (1 ) 保存到数据库进行削峰填谷;在空闲时再提取出来执行 (2)转向某个提示页面 (3)打印日志

线程池工作原理

  1. 若当前运行的线程少于corePoolSize,则创建新线程来执行任务(执行这一步需要获取全局锁)
  2. 若运行的线程多于或等于corePoolSize,则将任务加入BlockingQueue
  3. 若无法将任务加入BlockingQueue,同时线程数小于maximumPoolSize,则创建新的线程来处理任务(执行这一步需要获取全局锁)(这里会导致这里的任务可能比阻塞队列里的先执行,这里没加入阻塞队列)
  4. 若创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()

线程池参数的合理设计

  1. corePoolSize

核心线程数是与每一个任务的执行时间和每一秒产生的任务数量来确定。 每一个任务的平均执行时间和80% 时间内平均产生的任务数

  1. 任务队列(workQueue)

队列的长度 = 核心线程数/单个任务执行时间 * 2,最大任务等待时间是2秒,10 个核心线程,单个任务0.1秒,队列长度200

  1. 最大任务数

最大线程数 = (最大任务数-队列长度)* 任务执行时间 = (1000 - 200) * 0.1 = 80

简单实现线程池
  1. Task 创建一个 Task 代表任务类,用于实现具体的任务,需要实现Runnable接口,传入id 表示任务id
  2. Worker 类 继承 Thread 类 ,任务的执行类,维护一个Runnable 的列表,监控列表,当列表不为空,取出执行,构造传入List<Runnable>,并重写run方法
  3. ThreadPool类,线程池类,维护任务队列, 成员变量:1.任务队列,2.当前线程数量,3.核心线程数,4.最大线程长度,5任务队列长度等, 成员方法:1.提交任务,将任务加入list集合,需要判断当前是否超出任务总长度。2.执行任务,判断当前线程数量,决定创建核心线程数还是非核心线程数
  4. 测试类:1. 创建线程池类对象,2。提交多个任务

创建一个最简单的线程池

/**
 * 任务类,实现runnable接口,实现具体任务
 */
public class Task implements Runnable {

    private int id;
    public Task(int id){
        this.id = id;
    }
    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "当前任务: "+id+" 开始执行");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + "当前任务: "+id+" 执行完成");
    }
}

public class Worker extends Thread {

    private List<Runnable> taskList;
    private String name;

    public Worker(String name,List<Runnable> taskList){
        this.name = name;
        this.taskList = taskList;
    }

    @Override
    public void run() {
        // 判断集合中是否有对象,有对象就执行
        while (taskList.size() > 0){
            Runnable task = taskList.remove(0);
            task.run();
        }
    }
}

public class ThreadPool {
    // 创建一个线程安全的集合
    private List<Runnable> taskList = Collections.synchronizedList(new LinkedList<>());
    // 创建当前线程数,最大线程数,核心线程数
    private int curNum;
    private int corePoolSize;
    private int maximumPoolSize;
    private int workerSize;

    public ThreadPool(int corePoolSize, int maximumPoolSize, int workerSize) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workerSize = workerSize;
    }

    // 创建任务的提交方法
    public void submit(Task r){
        if(curNum >= workerSize){
            System.out.println( "任务队列已满,抛出任务进行终止");
        }else {
            taskList.add(r);
            taskExec(r);
        }
    }

    private void taskExec(Task r) {
        if(curNum <= corePoolSize){
            new Worker("核心线程" + r , taskList).start();
            curNum++;
        }else if(curNum < maximumPoolSize){
            new Worker("非核心线程"+r,taskList).start();
            curNum++;
        }else{
            System.out.println("任务" + r + "已经缓存,但是超过");
        }
    }
}

线程池源码解析

execute

线程的执行方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // clt记录着runState和workerCount
    // clt 采用一个整形遍历记录前三位线程状态,后29位当前线程数,这样可以避免使用总线锁重量级锁维护线程状态,可以直接使用AtomicInteger来进行维护
    int c = ctl.get();
    //workerCountOf方法取出低29位的值,表示当前活动的线程数
    //然后拿线程数和 核心线程数做比较
    if (workerCountOf(c) < corePoolSize) {
        // 如果活动线程数<核心线程数
        // 添加到
        //addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
        if (addWorker(command, true))
            // 如果成功则返回
            return;
        // 如果失败则重新获取 runState和 workerCount
        c = ctl.get();
    }
    // 如果当前线程池是运行状态并且任务添加到队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新获取 runState和 workerCount
        int recheck = ctl.get();
        // 如果不是运行状态并且 
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //第一个参数为null,表示在线程池中创建一个线程,但不去启动
            // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
            addWorker(null, false);
    }
    //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize
    else if (!addWorker(command, false))
        //如果失败则拒绝该任务
        reject(command);
}
addworkers

将线程添加到工作队列, 并启动当前的task进行执行,当然查看源码,重写了run方法,如果当前为null 会从队列里面那一个运行。

加锁的原因是我们在自己设计的简单线程池中List<Runnable> 是线程安全的,而线程池源码中,Runnbale使用 HashSet进行存储是线程不安全的,所以需要加锁。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {

        int c = ctl.get();
        //  获取运行状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果状态值 >= SHUTDOWN (不接新任务&不处理队列任务)
        // 并且 如果 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // 返回false
            return false;

        for (;;) {
            //获取线程数wc
            int wc = workerCountOf(c);
            // 如果wc大与容量 || core如果为true表示根据corePoolSize来比较,否则为maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加workerCount(原子操作)
            if (compareAndIncrementWorkerCount(c))
                // 如果增加成功,则跳出
                break retry;
            // wc增加失败,则再次获取runState
            c = ctl.get();  // Re-read ctl
            // 如果当前的运行状态不等于rs,说明状态已被改变,返回重新执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根据firstTask来创建Worker对象
        w = new Worker(firstTask);
        // 根据worker创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            // new一个锁
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 获取runState
                int rs = runStateOf(ctl.get());
                // 如果rs小于SHUTDOWN(处于运行)或者(rs=SHUTDOWN && firstTask == null)
                // firstTask == null证明只新建线程而不执行任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 如果t活着就抛异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 否则加入worker(HashSet)
                    //workers包含池中的所有工作线程。仅在持有mainLock时访问。
                    workers.add(w);
                    // 获取工作线程数量
                    int s = workers.size();
                    //largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        // 如果 s比它还要大,则将s赋值给它
                        largestPoolSize = s;
                    // worker的添加工作状态改为true    
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果worker的添加工作完成
            if (workerAdded) {
                // 启动线程
                t.start();
                // 修改线程启动状态
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回线启动状态
    return workerStarted;
}
Worker
 private final class Worker
     extends AbstractQueuedSynchronizer
     implements Runnable

可以看到它继承了AQS并发框架还实现了Runnable。证明它还是一个线程任务类。那我们调用t.start()事实上就是调用了该类重写的run方法。

Worker为什么不使用ReentrantLock来实现呢? tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。

线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态

 public void run() {  runWorker(this); }

Worker 重写的Run 方法调用runWorker(this);

final void runWorker(Worker w) {
    // 拿到当前线程
    Thread wt = Thread.currentThread();
    // 拿到当前任务
    Runnable task = w.firstTask;
    // 将Worker.firstTask置空 并且释放锁
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果task或者getTask不为空,则一直循环
        while (task != null || (task = getTask()) != null) {
            // 加锁
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //  return ctl.get() >= stop 
            // 如果线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断
            // 其实就是保证两点:
            // 1. 线程池没有停止
            // 2. 保证线程没有中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 中断当前线程
                wt.interrupt();
            try {
                // 空方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行run方法(Runable对象)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 执行完后, 将task置空, 完成任务++, 释放锁
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 退出工作
        processWorkerExit(w, completedAbruptly);
    }

总结一下runWorker方法的执行过程:

while循环中,不断地通过getTask()方法从workerQueue中获取任务

如果线程池正在停止,则中断线程。否则调用3.

调用task.run()执行任务;

如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w)

关闭线程池

可通过调用线程池的shutdown或shutdownNow方法来关闭线程池. 它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止. 但是它们存在一定的区别

shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表 shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程. 只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true. 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true. 至于应该调用哪一种方法,应该由提交到线程池的任务的特性决定,通常调用shutdown方法来关闭线程池,若任务不一定要执行完,则可以调用shutdownNow方法.

线程的状态,线程数保存单个Integer值里

在ThreadPoolExecutor的属性定义中频繁地用位运算来表示线程池状态; 位运算是改变当前值的一种高效手段.

下面从属性定义开始

Integer 有32位; 最右边29位表工作线程数; 最左边3位表示线程池状态,可表示从0至7的8个不同数值 线程池的状态用高3位表示,其中包括了符号位.

好处: 一个状态向另一个状态切换到另一个状态 不需要使用总线锁来进行保证状态切换的安全性。

核心线程在没有任务的时候会阻塞

为什么单线程池和固定线程池使用的任务阻塞队列是LinkedBlockingQueue(),而缓存线程池使用的是SynchronousQueue()呢?

因为单线程池和固定线程池中,线程数量是有限的,因此提交的任务需要在LinkedBlockingQueue队列中等待空余的线程;而缓存线程池中,线程数量几乎无限(上限为Integer.MAX_VALUE),因此提交的任务只需要在SynchronousQueue队列中同步移交给空余线程即可。

线程池中一个线程oom其他线程还可以用吗?

可以,在线程池中一个线程报错,或者oom线程池会将其进行回收,避免造成其他线程出错。

如果程序能正常处理这个异常情况,比如不再申请更多的内存或其它资源,或者放弃那个子任务或子线程,系统OOM状态是可以回到正常情况。

如果主线程抛异常退出了,子线程还能运行么?

先来一个定义线程不像进程,一个进程中的线程之间是没有父子之分的,都是平级关系。即线程都是一样的, 退出了一个不会影响另外一个。 因此,答案是如果主线程抛异常退出了,子线程还能运行。

其实发生OOM的线程一般情况下会死亡,也就是会被终结掉,该线程持有的对象占用的heap都会被gc了,释放内存。因为发生OOM之前要进行gc,就算其他线程能够正常工作,也会因为频繁gc产生较大的影响。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 线程池的设计思路
  • 什么是线程?
    • 从java线程到linux线程
    • 线程池的创建参数
    • 线程池工作原理
    • 线程池参数的合理设计
      • 简单实现线程池
      • 线程池源码解析
      • 关闭线程池
      • 线程的状态,线程数保存单个Integer值里
        • 为什么单线程池和固定线程池使用的任务阻塞队列是LinkedBlockingQueue(),而缓存线程池使用的是SynchronousQueue()呢?
          • 线程池中一个线程oom其他线程还可以用吗?
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档