前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程进阶——JUC并发编程之Executors框架设计思想一探究竟🔥

多线程进阶——JUC并发编程之Executors框架设计思想一探究竟🔥

作者头像
须臾之余
发布2021-12-28 10:53:37
2490
发布2021-12-28 10:53:37
举报
文章被收录于专栏:须臾之余须臾之余

1、学习切入点

Executors 框架是整个JUC 包中类/接口关系中最为复杂的框架,真正理解Executors框架的前提是理清楚各个模块之间的关系,高屋建瓴,从整体到局部才能透彻理解各个模块的功能和背后设计的思路!

本文将从 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable这些核心模块展开分析。

2、从Executor谈起

Executor 是JDK 1.5 时,随着JUC引入的一个接口,引入该接口的主要目的是 解耦任务本身和任务执行。我们之前通过线程执行一个任务时,往往需要先创建一个线程.satrt()去执行任务,

而Executor 接口解耦了任务和任务的执行,该接口只有一个方法,入参为待执行的任务。

代码语言:javascript
复制
public interface Executor {
    /**
     * 执行给定的Runable任务
     * 根据Executor接口的实现类不同,具体执行方式也不同
     */
    void execute(Runnable command);
}

我们可以像下面这样执行任务,而不必心线程的创建

代码语言:javascript
复制
Executor executor = anExecutor
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

由于Executor 仅仅是一个接口,所有根据其实现不同,执行具体的任务也不同,参看源码注释,我们可以知道比如:

2.1、同步执行任务

代码语言:javascript
复制
class DirectExecutor implements Executor {
    public void execute(Runnable r) {
    r.run();
  }
}}

2.2、异步执行任务

代码语言:javascript
复制
class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
      new Thread(r).start();
    }
  }}

2.3、对任务进行排队执行

代码语言:javascript
复制
 class SerialExecutor implements Executor {
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    final Executor executor;
    Runnable active;
 
    SerialExecutor(Executor executor) {
      this.executor = executor;
    }
    public synchronized void execute(final Runnable r) {
      tasks.offer(new Runnable() {
        public void run() {
          try {
            r.run();
          } finally {
            scheduleNext();
          }
        }
      });
      if (active == null) {
        scheduleNext();
      }
    }
    protected synchronized void scheduleNext() {
      if ((active = tasks.poll()) != null) {
        executor.execute(active);
      }
    }
  }}

以上这些源码给出的示例,仅仅是给出了一些可能的 Executor实现,JUC包中提供了很多 Executor的具体实现类,这里关键是理解Executor的设计思想——对任务和任务的执行解耦

3、增强的ExecutorService

因为Executor 接口的功能很简单,为了对它进行增强,JUC又提供了一个名为 ExecutorService 接口,那它到达增强了哪些东西呢?

可以看到,ExecutorService 在Executor 的基础上增加了对任务的控制,同时也包括对自己生命周期的管理,主要有四类:

1、关闭执行器,禁止任务提交

2、监视执行器状态

3、提供对异步任务支持

4、提供对处理任务的支持

4、周期任务调度——ScheduledExecutorService

在开发环境中,我们可能需要提交给执行器某些任务能够定时执行或者周期性执行,我们可以自己去实现Executor 接口来创建符合我们需要的类,Doug Lea 已经考虑好了这类需求,所有在ExecutorService 的基础之上,又提供了一个接口 ScheduledExecutorService。

案例演示

代码语言:javascript
复制
public class ScheduleExecutorTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler=Executors.newScheduledThreadPool(4);
        //开始执行时间1秒后,每隔1秒执行一次任务
        final ScheduledFuture<?>scheduledFuture=scheduler.scheduleAtFixedRate(new BeepTask(),1,1,TimeUnit.SECONDS);

        //5秒后,取消任务,关闭线程池
        scheduler.schedule(()->{
            scheduledFuture.cancel(true);
            System.out.println("over");
            scheduler.shutdownNow();
        },5,TimeUnit.SECONDS);
    }
    private static class BeepTask implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"beep!");
        }
    }
}

注意:scheduleAtFixedRate 方法返回一个ScheduledFuture 对象,ScheduledFuture其实就是在Future 的基础上增加了延迟功能。通过ScheduledFuture 可以取消一个任务的执行。

ScheduledExecutorService 完整接口说明如下:

代码语言:javascript
复制
public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 提交一个待执行的任务, 并在给定的延迟后执行该任务.
     * 
     * @param command 待执行的任务
     * @param delay 延迟时间
     * @param unit 延迟时间的单位
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    /**
     * 提交一个待执行的任务(具有返回值), 并在给定的延迟后执行该任务
     *
     * @param callable 待执行的任务
     * @param delay 延迟时间
     * @param unit 延迟时间的单位
     * @param <V>  返回值类型
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /**
     * 提交一个待执行的任务
     * 该任务在 initialDelay 后开始执行, 然后在initialDelay+period 后执行, 接 
     * 着在 initialDelay + 2 * period 后执行, 依此类推
     * 
     * @param command 待执行的任务
     * @param initialDelay 首次执行的延迟时间
     * @param period 连续执行之间的周期
     * @param unit 延迟时间的单位
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * 提交一个待执行的任务
     * 该任务在 initialDelay 后开始执行, 随后在每一次执行终止和下一次执行开始之间 
     * 都存在给定的延迟
     * 如果任务的任一执行遇到异常, 就会取消后续执行. 否则, 只能通过执行程序的取消或 
     * 终止方法来终止该任务
     *
     * @param command 待执行的任务
     * @param initialDelay 首次执行的延迟时间
     * @param delay 一次执行终止和下一次执行开始之间的延迟
     * @param unit  延迟时间的单位
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

至此,Executors 框架中的三个最核心的接口介绍完毕,三个核心接口关系如下

5、生产Executor的工厂

通过前面的介绍,读者应该对Executors框架有了一个初步的认识,Executors 框架就是用来解耦任务本身与任务的执行,并提供了三个核心(Executor、ExecutorService、ScheduledExecutorService)接口来满足使用者的需求!

1、Executor:提交普通的可执行任务 2、ExecutorService:提供对线程池生命周期的管理、异步任务的支持 3、ScheduledExecutorService:提供对任务周期性执行的支持

既然有了接口,那么就有相应的实现类,JUC 提供了许多默认的接口实现,用户如果自己去创建这些类的实例,就需要了解这些类的细节,我们可不可以仅仅根据一些参数就能创建这些实例呢?

因此Executors类就是专门用于创建上述接口的实现类对象,Executors其实就是一个简单工厂,它的所有方法都是static的,Executors 一共提供了五类可供创建的Executors执行器实例。

1、固定线程数的线程池

代码语言:javascript
复制
    /**
     * 创建一个具有固定线程数的Executor
     *
     * @param nThreads 核心线程数(银行有五个窗口,平时开三个窗口(核心线程数))
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * 创建一个具有固定线程数的Executor.
     * 在需要时使用提供的 ThreadFactory 创建新线程.
     *
     * @param nThreads 核心线程数
     * @param threadFactory 创建线程的工厂
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

public interface ThreadFactory {
    //创建了一个线程
    Thread newThread(Runnable r);
}

ThreadFactory 作为一个线程工厂,我们可以由外部指定ThreadFactory实例,以决定线程具体的创建方式。

下面就以 DefaultThreadFactory为例

代码语言:javascript
复制
//默认的线程工厂
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);//原子性
        private final ThreadGroup group;//线程组
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

可以看到,DefaultThreadFactory 初始化的时候定义了线程组、线程名称等信息,每创建一个线程,都给线程统一分配了这些信息,避免了手动new的方式创建线程,又可以进行工厂复用

2、单个线程的线程池

代码语言:javascript
复制
 /**
     * 创建一个使用单个 worker 线程的 Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    /**
     * 创建一个使用单个 worker 线程的 Executor
     * 在需要时使用提供的 ThreadFactory 创建新线程
     *
     * @param threadFactory 线程工厂
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    //对返回的Executor实例进行一个包装
    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

我们可以看到,只有单个线程上文线程池其实就是指定好了线程数为1的固定线程池,主要区别就是返回的Executor实例用了一个 FinalizableDelegatedExecutorService对象 进行包装,它核心继承了 DelegatedExecutorService ,这是个包装类,实现了 ExecutorService的所有方法,但是内部实现其实都委托给了传入的 ExecutorService 实例。

为什么要多此一举呢?

因为返回的 ThreadPoolExecutor 包含一些设置线程池大小的方法,对于只有单个线程的线程池来说,我们是不希望用户通过强转的方式使用这些方法的,所以需要这么一个包装类,只暴露ExecutorService 本身的方法。

3、可缓存的线程池

代码语言:javascript
复制
/**
 * 创建一个可缓存线程的Execotor
 * 如果线程池中没有线程可用, 则创建一个新线程并添加到池中
 * 如果有线程长时间未被使用(默认60s,可通过threadFactory配置), 则从缓存中移除
 */
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

/**
 * 创建一个可缓存线程的Execotor.
 * 如果线程池中没有线程可用, 则创建一个新线程并添加到池中;
 * 如果有线程长时间未被使用(默认60s, 可通过threadFactory配置), 则从缓存中移除.
 * 在需要时使用提供的 ThreadFactory 创建新线程.
 */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

可以看到,返回的还是ThreadPoolExecutor对象,只是指定了超时时间,另外线程池中线程的数量在[0, Integer.MAX_VALUE]之间。

4、可延时/周期性调度的线程池

代码语言:javascript
复制
/**
 * 创建一个具有固定线程数的 可调度Executor.
 * 它可安排任务在指定延迟后或周期性地执行.
 */
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
/**
 * 创建一个具有固定线程数的 可调度Executor.
 * 它可安排任务在指定延迟后或周期性地执行.
 * 在需要时使用提供的 ThreadFactory 创建新线程.
 */
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

5、Fork/Join线程池

代码语言:javascript
复制
/**
 * 创建具有指定并行级别的ForkJoin线程池.
 */
public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

/**
 * 创建并行级别等于CPU核心数的ForkJoin线程池.
 */
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

3、总结

至此,Executors框架就基本介绍完了,我们看下他的类图

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、学习切入点
  • 2、从Executor谈起
    • 2.1、同步执行任务
      • 2.2、异步执行任务
        • 2.3、对任务进行排队执行
        • 3、增强的ExecutorService
        • 4、周期任务调度——ScheduledExecutorService
        • 5、生产Executor的工厂
          • 1、固定线程数的线程池
            • 2、单个线程的线程池
              • 3、可缓存的线程池
                • 4、可延时/周期性调度的线程池
                  • 5、Fork/Join线程池
                  • 3、总结
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档