前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发——线程池运行机制和如何使用

Java并发——线程池运行机制和如何使用

作者头像
良月柒
发布2019-03-19 16:16:45
1.5K0
发布2019-03-19 16:16:45
举报

合理利用线程池能够带来三个好处。 1、第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 2、第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。 3、第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池创建

我们可以通过ThreadPoolExecutor来创建一个线程池。

代码语言:c#
复制
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {            if (corePoolSize < 0 || maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize || keepAliveTime < 0)                throw new IllegalArgumentException();            if (workQueue == null || threadFactory == null ||                 handler == null)                throw new NullPointerException();                this.corePoolSize = corePoolSize;                this.maximumPoolSize = maximumPoolSize;                this.workQueue = workQueue;                this.keepAliveTime = unit.toNanos(keepAliveTime);                this.threadFactory = threadFactory;                this.handler = handler;
    }

创建一个线程池需要输入几个参数:

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。 1、ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。 2、LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。 3、SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。 4、PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。n AbortPolicy:直接抛出异常。 1、CallerRunsPolicy:只用调用者所在线程来运行任务。 2、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。 3、DiscardPolicy:不处理,丢弃掉。 4、当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
线程池的状态

线程池中定义了五种状态,这些状态都和线程的执行密切相关。

代码语言:java
复制
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
  • RUNNING: 自然是运行状态,指可以接受任务执行队列里的任务。
  • SHUTDOWN: 指调用了 shutdown() 方法,不再接受新任务了,但是队列里的任务得执行完毕。
  • STOP: 指调用了 shutdownNow() 方法,不再接受新任务,同时抛弃阻塞队列里的所有任务并中断所有正在执行任务。
  • TIDYING: 所有任务都执行完毕,在调用 shutdown()/shutdownNow() 中都会尝试更新为这个状态。
  • TERMINATED: 终止状态,当执行 terminated() 后会更新为这个状态。

用图表示为:

线程池的流程分析

从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  • 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  • 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  • 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

源码分析:上面的流程分析让我们很直观的了解的线程池的工作原理,让我们再通过源代码来看看是如何实现的。线程池执行任务的方法如下:

代码语言:c#
复制
    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();//获取当前线程池的状态    
            int c = ctl.get();//如果线程数量小于核心线程池
        if (workerCountOf(c) < corePoolSize) {//创建新的线程并执行
            if (addWorker(command, true))            return;
            c = ctl.get();
        }//如线程数大于等于基本线程数,线程池处于运行状态,则将当前任务放到工作队列
        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();//双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (! isRunning(recheck) && remove(command))
                reject(command);//如果当前线程池为空就新创建一个线程并执行    
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }else if (!addWorker(command, false))

工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从Worker的runWorker方法里看到这点:

代码语言:text
复制
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {
                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();                                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
线程池相关的类
  • Executor:线程池最顶层接口,它只有一个方法execute(Runnable command)。用来执行Runnable类型的接口。
  • ExecutorService:Executor的子接口,声明了管理线程池的一些方法,比如关闭线程池、查看线程池的状态。
  • ThreadPoolExecutor:ExecutorService默认的实现类,我们通常创建这个类的实例来实现线程池。
  • Executors:线程池的工具类,包含了很多静态方法,可以让我们创建不同类型的线程池。
  • ScheduledExecutorService:解决那些需要任务重复执行的问题。
  • ScheduledThreadPoolExecutor:继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
线程池类型

通过Executors可以创建不同的线程池。

  • newFixedThreadPool (int nThreads):固定大小线程池

可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。

代码语言:c#
复制
    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads, 0L,        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
  • newSingleThreadExecutor():单线程的线程池,同样使用无界限的队列。
代码语言:c#
复制
    public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,             new LinkedBlockingQueue<Runnable>()));
    }
  • newCachedThreadPool():无界线程池,可以进行自动线程回收

首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。这样每当有新任务时都去创建新的线程,而60秒内没有执行任务的线程将被销毁。

代码语言:c#
复制
    public static ExecutorService newCachedThreadPool() {                return new ThreadPoolExecutor(0,Integer.MAX_VALUE,        60L,TimeUnit.SECONDS,ynchronousQueue<Runnable>());
    }
线程池执行和关闭
  • 线程池提交任务有两种方式: 1、我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务知否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。
代码语言:c#
复制
    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();            int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;
            c = ctl.get();
        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))
                reject(command);            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }else if (!addWorker(command, false))
            reject(command);
    }

2、我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

代码语言:c#
复制
    public <T> Future<T> submit(Callable<T> task) {                if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);                    return ftask;
    }
  • 线程池的关闭 有运行任务自然也有关闭任务,从上文提到的 5 个状态就能看出如何来关闭线程池。其实无非就是两个方法 shutdown()/shutdownNow()。但他们有着重要的区别:
    • shutdown():执行后停止接受新任务,会把队列的任务执行完毕。
    • shutdownNow():也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。

关闭线程池的例子:

代码语言:text
复制
        long start = System.currentTimeMillis();
        for (int i = 0; i <= 5; i++) {
            pool.execute(new Job());
        }

        pool.shutdown();

        while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
            LOGGER.info("线程还在执行。。。");
        }
        long end = System.currentTimeMillis();
        LOGGER.info("一共处理了【{}】", (end - start));
线程池的状态监控

其实 ThreadPool 本身已经提供了不少 api 可以获取线程状态。这样我们可以通过调用这些方法获取线程池状态。 甚至我们可以继承线程池扩展其中的几个函数来自定义监控逻辑:

代码语言:c#
复制
protected void beforeExecute(Thread t, Runnable r) { }protected void afterExecute(Runnable r, Throwable t) { }protected void terminated() { }

END

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员的成长之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 线程池创建
  • 线程池的状态
  • 线程池的流程分析
  • 线程池相关的类
  • 线程池类型
  • 线程池执行和关闭
  • 线程池的状态监控
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档