专栏首页悠扬前奏的博客Java并发-30.Executor框架

Java并发-30.Executor框架

  • HotSpot VM线程模型中Java线程被一对一映射为本地操作系统线程。
  • 应用程序通过Executor框架控制上层调度,下层调度由操作系统内核控制,不受应用程序影响

1. Executor框架结构

  • 任务,包括执行任务需要实现的接口:
    • Runnable接口和Callable接口的实现类,用于被ThreadPoolExecutor或ScheduledThreadExecutor执行
  • 任务的执行,包括任务执行机制的核心接口Executor,和继承自Executor的ExecutorService接口,有两个实现类:
    • ThreadPoolExecutor:用来执行被提交的任务
    • ScheduledThreadExecutor:在给定延时后执行任务,或定时执行任务
  • 异步运算结果,包括接口Future和实现Future接口的FutureTask类

2. Executor成员

2.1 ThreadPoolExecutor

工厂类Executors来创建,有三种:

  • FixeThreadPool:可重用固定线程数的线程池
    • 用于需要限制当前线程数量的应用场合
    • 它的corePoolSize和maximunPoolSize设置为创建时的参数。
    • 线程池中线程数大于corePoolSize,多余的空闲线程立即终止
    • 使用无界队列作为工作队列
    • 源码:
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • SingleThreadExecutor:单个worker线程的Executor
    • 用于保证顺序执行各个任务,在任意时间点也不会有多个活动线程
    • corePoolSize= maximunPoolSize=1
    • 其余和ThreadPoolExecutor一样
    • 源码
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • CachedThreadPool
    • 根据需要创建新线程,用于执行很多的短期异步小任务
    • 大小无界的线程池,corePoolSize = 0, maximunPoolSize=Integer.MAX_VALUE
    • 空闲线程等待60秒
    • SynchronousQueue作为工作队列
    • 源码:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

2.2 ScheduledThreadPoolExecutor

通常用工厂类Executors类创建,有两种:

  • ScheduledThreadPoolExecutor:包含固定个线程的ScheduledThreadPoolExecutor。
    • 使用DelayQueue作为任务队列,放入其中的是ScheduledFutureTask,主要包含3个成员变量:
      • long time,标识任务将要被执行的具体时间,DelayQueue中封装了一个PriorityQueue,根据它排序。
      • long sequenceNumber,标识这个任务被添加到ScheduledThreadPoolExecutor中的序号,先按time排序后按照它排序
      • long period,表示任务执行的时间间隔
    • 执行周期任务后,增加额外处理
    • 构造器源码:
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
  • SingleThreadScheduledExecutor只包含一个线程的ScheduledThreadPoolExecutor。
    • 构造器源码:
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

2.3 Future接口

  • Future接口和实现它的FutureTask类用来表示异步运算的结果。
  • 可以看到submit()方法会返回一个FutureTask对象
  • 三种状态
    • 未启动
    • 已启动
      • 正常结束
      • 取消而结束(FutureTask.cancel())
      • 异常而结束
  • 使用
    • 交给Executor执行
    • 通过ExecutorService.submit()方法返回FutureTask,执行get()方法或则cancel()方法。
  • 实现
    • 基于AbstractQueuedSynchronized(AQS)实现:AQS是一个同步框架,提供通用机制来原子性的管理同步状态,阻塞和唤醒线程,以及维护阻塞线程的队列(基于它实现的同步器包括ReentrantLock,Semaphore,ReentrantReadWriteLock,CountDownLatch和FutureTask)
      • 基于AQS的同步器包含两种操作:
        • 至少一个acquire操作,阻塞调用线程,直到AQS状态允许这个线程继续执行
        • 至少一个release操作,改变AQS状态,改变后可允许一个或多个阻塞线程被解除阻塞,Future的release操作包括run()和cancel()

AQS作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现AQS的tryAcquireShared(int)方法检查同步状态,实现了tryReleaseShared(int)方法更新同步状态,他们控制FutureTask的获取和释放操作。

Sync是FutureTask的内部私有类,继承与AQS,FutureTask的所有公共方法都委托给了内部私有的Sync:

  • FutureTask.get()方法调用AQS.acquireSharedInturruptibli(int arg)方法:
    1. AQS.acquireSharedInterruptibly(int arg)方法,返回子类Sync中实现的tryAcquireShared()方法来判断acquire方法能否成功,成功条件为state执行完成状态RUN或者已取消状态CANCELLED,且runner不为null
    2. 成功则get()方法立即返回,失败则线程到线程等待队列中去等待其他线程执行release操作
    3. 其他线程执行release操作(FutureTask.run()或者FutureTask.cancel())唤醒后,当前线程再次执行tryAcquireShared()方法返回正值1,当前线程离开线程等待队列并唤醒它的后继线程
    4. 返回计算的结果或抛出异常
  • FutureTask.run()
    1. 执行构造函数中指定的任务(Callable.call())
    2. 原子方式更新同步状态(AQS.compareAndSetState(int except, int update), 设置state为RAN),如果原子操作成功,设置代表计算结果的变量result值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)
    3. AQS.releaseShared(int arg)首先返回子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然后返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程
    4. 调用FutureTask.done()

2.4 Runnable接口和Callable接口

  • 两个接口的实现类都可以被ScheduledThreadPoolExecutor或者ThreadPoolExecutor执行。
  • Runnable不会返回结果,Callable会返回结果
  • Executors提供了API把Runnable包装成Callable

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

推荐阅读

  • 日访问百万级微信小程序优化技巧总结

    之前负责的锡慧在线小程序是一款公益性质在线教育类小程序,因疫情影响导致流量暴增,日访问过百万

    薛定喵君
    小程序微信缓存RedisCanvas
  • Spiral: 一个性能卓越的PHP/Golang混合开发框架

    春节期间,了解到一个“全新”的 WEB 开发框架:Spiral, 最开始引起我的兴趣是从同事那里听说了 RoadRunner. 然后去了解 RoadRunner 的时候看到了 Spiral. 之所以把“全新”用双引号引起来,是因为这个框架其实从 2013 年起就在它的开发团队以及一些企业客户中应用了,经历了各种实际应用场景的考验,Spiral 的功能及其丰富,性能与当前主流的 PHP 框架相比也相当出众。但这个框架源自俄国,在国内不算知名,他们团队开始重视和梳理开源,也应该是才开始的事情。

    小李刀刀
    PHPGoSymfony
  • kubernetes系列教程(二十)prometheus提供完备监控系统

    上一个章节中kubernetes系列教程(十九)使用metric-server让HPA弹性伸缩愉快运行介绍了在kubernetes中的监控架构,通过安装和使用metric-server提供kubernetes中的核心监控指标:提供node节点和pod容器CPU和内存的监控能力,核心监控指标提供的监控维度和指标相对有限,需要更好的扩展监控能力,需要使用自定义监控来实现,本文介绍prometheus提供更更加丰富的自定义监控能力。

    HappyLau谈云计算
    Kubernetes容器微服务云监控
  • 如何将设计思维应用到精益初创公司的软件开发

    我们所说的设计思维,是指由 IDEO 公司的 Tim Brown 提出,并且正在改变全世界组织的设计思维,简称 DT。(译者注:IDDO,当代最具影响力的设计公司之一)

    Aceyclee
    Serverless无服务器云函数
  • InnoDB 事务加锁分析

    一般大家对数据库事务的了解可能停留在事务的ACID特性以及事务4种不同的隔离级别层面上,而对于事务 4 种不同隔离级别如何实现了解相对较少。

    2020labs小助手
    MySQLSQL数据库MVCMVCC
  • FutureTask 核心源码解析

    研究源码,一般我们都从整体以及实例先入手,再研究细节,不至于一开始就“深陷其中而"当局者迷".

    JavaEdge
    HTTPJava
  • 200行代码落地人脸识别开锁应用

    2019年国庆,帮朋友实现了一个人脸识别进行开锁的功能,用在他的真人实景游戏业务中。几个月来运行稳定,体验良好,借着这个春节宅家的时间,整理一下这个应用的实现过程。

    高树磊
    人脸识别图像处理
  • 滑动验证码攻防对抗

        在业务安全领域,滑动验证码已经是国内继,传统字符型验证码之后的标配。众所周知,打码平台和机器学习这两种绕过验证码的方式,已经是攻击者很主流的思路,不再阐述。冷渗透介绍的是一个冷门的绕过思路和防御方案。这些积累,均来自于实战之中,希望有用。

    周俊辉
    HTTP网络安全安全网站
  • 程序员进阶必读,万字总结Mysql优化精华篇

    price decimal(8,2)有2位小数的定点数,定点数支持很大的数(甚至是超过int,bigint存储范围的数)

    程序员内点事
    全文检索缓存SQL数据库Python
  • 运维转型 | 运维人不再只是“救火英雄”

    各行各业都开启了数字化转型的进程,运维团队在这种时代的浪潮中又该何去何从?我在帮助一些企业落地了运维技术平台之后,开始反思这个问题,并将所思所想整理成本篇文章。

    嘉为科技
    企业运维自动化云计算

扫码关注云+社区

领取腾讯云代金券