专栏首页BanzClubExecutor执行器与线程池

Executor执行器与线程池

Java使用Executor框架执行多线程任务,创建与操作系统线程一对一的映射线程,由操作系统分配CPU来执行。称为任务的两级调度模型,如下图所示:

public interface Executor {
    void execute(Runnable command);
}

执行器Executor包含三个部分:

任务

  • 无返回值任务,实现Runnable接口
  • 带返回值任务,实现Callable接口

注:工具类Executors可以把一个Runnable对象封装为一个Callable对象

任务的执行

  • ExecutorService

主要方法

void execute(Runnable runnable) 无返回结果执行

Future<T> submit(Callable<T> task) 带返回结果执行

shutdown()

shutdownNow()

  • ScheduledExecutorService

主要方法

-无返回结果:执行任务,延迟时间、单位

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

-带返回结果:执行任务,延迟时间、单位

ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

-固定延迟开始:以上一次发起时间计算

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

-固定延迟开始:以上一次结束时间计算

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

注:相比于Timer,ScheduledExecutorService更灵活,功能更强大。

执行结果

Future

Executor的框架结构也是基于这三个方面实现,下面是各个接口的实现类和接口示意:

线程池—ThreadPoolExecutor

Java线程池应该是使用最多的并发框架,通过使用线程池可以减少系统因频繁的创建和销毁线程而带来的资源的浪费,降低资源消耗;执行的任务也可以直接从线程池获得线程执行,提高响应速度;线程创建过多也降低系统的稳定性,通过线程池也可以统一分配、监控,从而提高线程的可管理性。下面结合源码分析一下线程池原理:

主要属性

// 线程池控制状态ctl是包含两个概念字段的原子整数
// workerCount,表示有效的线程数
// runState,表示是否正在运行,关闭等
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 创建线程工厂
private volatile ThreadFactory threadFactory;
// 饱和策略
private volatile RejectedExecutionHandler handler;
// 空闲线程等待工作的超时时间
private volatile long keepAliveTime;
// 是否允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;

线程池状态

RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;

SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);

STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;

TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。

TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。

构造方法:主要就是设置核心线程数、最大线程数、阻塞队列、空闲线程存活时间、线程工厂、饱和策略

this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;

执行方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  1. 执行任务时,先判断线程池中线程数是否小于核心线程数;
  2. 小于核心线程数,则将任务添加到工作线程中,并执行;
  3. 大于核心线程数,将任务添加到阻塞队列,判断阻塞队列是否已满,不满则添加;
  4. 如果阻塞队列满的话,判断线程池中的线程数是否小于最大线程数;
  5. 小于最大线程数,将任务添加到工作线程中,并执行;
  6. 大于最大线程数,使用包含策略处理任务。

Work类

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // 执行任务的线程
    final Thread thread;
    // 执行的任务
    Runnable firstTask;
    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
        runWorker(this);
    }
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

线程池中的每个线程都被封装成一个Worker对象,线程池中调度执行的就是Worker对象——执行线程和执行任务。

线程池关闭

  • shutdown方法

将线程池切换到SHUTDOWN状态,并且终止所以空闲的线程,最后尝试终止线程池。

  • shutdownNow方法

将线程池状态切换到STOP状态,并且终止所有线程,取出阻塞队列中的所有未执行的任务,尝试终止线程池。

线程池监控

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。

扩展方法

  • beforeExecute方法:执行前
  • afterExecute方法:执行后
  • terminated方法:终止方法

调度线程池—ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是支持周期性任务的调度的线程池执行器,与Timer相比更加强大;

ScheduledThreadPoolExecutor实现ScheduledExecutorService接口;

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor类,主要用于执行延迟执行任务或周期性执行任务。周期性执行在执行之后将重设任务和时间,进行下次执行。

ScheduledThreadPoolExecutor中的阻塞队列是内部实现的DelayedWorkQueue——无界可延迟优先级阻塞队列,基于最小堆算法实现;

FutureTask

FutureTask是异步的且能够获取返回值的可执行任务,实现了Future接口和Runnable接口;

Future接口提供获取返回值的get方法和可以取消任务的cancel方法;

FutureTask的构造可以接收Callable对象、Runnable对象和返回值对象两种形式(注:Executors.callable(runnable, result)可以将runnable转化成callable);

FutureTask通过CAS操作state状态值,来控制整个FutureTask的生命周期,state包括:NEW(新建)、COMPLETING(执行中)、NORMAL(正常结束)、EXCEPTIONAL(异常结束)、CANCELLED(取消)、INTERRUPTING(中断中)、INTERRUPTED(中断)。

FutureTask执行任务结束后,将设置返回值,并唤醒调用get方法的线程;当任务未执行结束时,调用get方法的线程将会阻塞并且装入到waiters(链表结构)等待队列;

FutureTask使用LockSupport的park方法实现waiters队列中的线程阻塞,LockSupport的unpark方法唤醒阻塞线程;

例子:

// 结算展示页面关于商品价格展示的逻辑:使用线程池和FutureTask来实现
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    // 获取商品基础价格
    FutureTask<Double> productPrice = new FutureTask<>(() -> {
        // 调用商品系统接口
        return 19.5;
    });
    // 获取商品促销
    FutureTask<Double> productPromotion = new FutureTask<>(() -> {
        // 调用促销系统接口
        return 16.0;
    });
    // 获取商品运费
    FutureTask<Double> productFreight = new FutureTask<>(() -> {
        // 调用运费系统接口
        return 6.0;
    });
    executor.execute(productPrice);
    executor.execute(productPromotion);
    executor.execute(productFreight);
    // 结果的组装
    System.out.println("商品原价:" + productPrice.get());
    System.out.println("商品促销价格:" + productPromotion.get());
    System.out.println("商品运费价格:" + productFreight.get());
}

  1. 《Java并发编程艺术》
  2. http://www.ideabuffer.cn/categories/%E5%BC%80%E5%8F%91%E6%89%8B%E5%86%8C/J-U-C/

本文分享自微信公众号 - BanzClub(banz-club)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-01-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

推荐阅读

  • 远程办公经验为0,如何将日常工作平滑过度到线上?

    我是一名创业者,我的公司(深圳市友浩达科技有限公司)在2018年8月8日开始运营,现在还属于微型公司。这个春节假期,我一直十分关注疫情动向,也非常关心其对公司带来的影响。

    TVP官方团队
    TAPD 敏捷项目管理腾讯乐享企业邮箱企业编程算法
  • 数据中台,概念炒作还是另有奇效? | TVP思享

    作者简介:史凯,花名凯哥,腾讯云最具价值专家TVP,ThoughtWorks数据智能业务总经理。投身于企业数字化转型工作近20年。2000年初,在IBM 研发企业级中间件,接着加入埃森哲,为大型企业提供信息化架构规划,设计,ERP,云平台,数据仓库构建等技术咨询实施服务,随后在EMC负责企业应用转型业务,为企业提供云迁移,应用现代化服务。现在专注于企业智能化转型领域,是数据驱动的数字化转型的行业布道者,数据中台的推广者,精益数据创新体系的创始人,2019年荣获全球Data IQ 100人的数据赋能者称号,创业邦卓越生态聚合赋能官TOP 5。2019年度数字化转型专家奖。打造了行业第一个数据创新的数字化转型卡牌和工作坊。创建了精益数据创新方法论体系构建数据驱动的智能企业,并在多个企业验证成功,正在向国内外推广。

    TVP官方团队
    大数据数据分析企业
  • 扩展 Kubernetes 之 CRI

    使用 cri-containerd 的调用流程更为简洁, 省去了上面的调用流程的 1,2 两步

    王磊-AI基础
    Kubernetes
  • 扩展 Kubernetes 之 Kubectl Plugin

    kubectl 功能非常强大, 常见的命令使用方式可以参考 kubectl --help,或者这篇文章

    王磊-AI基础
    Kubernetes
  • 多种登录方式定量性能测试方案

    最近接到到一个测试任务,某服务提供了两种登录方式:1、账号密码登录;2、手机号+验证码登录。要对这两种登录按照一定的比例进行压测。

    八音弦
    测试服务 WeTest
  • 线程安全类在性能测试中应用

    首先验证接口参数签名是否正确,然后加锁去判断订单信息和状态,处理用户增添VIP时间事务,成功之后释放锁。锁是针对用户和订单的分布式锁,使用方案是用的redis。

    八音弦
    安全编程算法
  • 使用CDN(jsdelivr) 优化博客访问速度

    PS: 此篇文章适用于 使用 Github pages 或者 coding pages 的朋友,其他博客也类似.

    IFONLY@CUIT
    CDNGitGitHub开源
  • 扩展 Kubernetes 之 CNI

    Network Configuration 是 CNI 输入参数中最重要当部分, 可以存储在磁盘上

    王磊-AI基础
    Kubernetes
  • 聚焦【技术应变力】云加社区沙龙online重磅上线!

    云加社区结合特殊时期热点,挑选备受关注的音视频流量暴增、线下业务快速转线上、紧急上线防疫IoT应用等话题,邀请众多业界专家,为大家提供连续十一天的干货分享。从视野、预判、应对等多角度,帮助大家全面提升「技术应变力」!

    腾小云
  • 京东购物小程序购物车性能优化实践

    它是小程序开发工具内置的一个可视化监控工具,能够在 OS 级别上实时记录系统资源的使用情况。

    WecTeam
    渲染JavaScripthttps网络安全缓存

扫码关注云+社区

领取腾讯云代金券