(77) 异步任务执行服务 / 计算机程序的思维逻辑

Java并发包提供了一套框架,大大简化了执行异步任务所需的开发,本节我们就来初步探讨这套框架。 在之前的介绍中,线程Thread既表示要执行的任务,又表示执行的机制,而这套框架引入了一个"执行服务"的概念,它将"任务的提交"和"任务的执行"相分离,"执行服务"封装了任务执行的细节,对于任务提交者而言,它可以关注于任务本身,如提交任务、获取结果、取消任务,而不需要关注任务执行的细节,如线程创建、任务调度、线程关闭等。 以上描述可能比较抽象,接下来,我们会一步步具体阐述。 基本接口 首先,我们来看任务执行服务涉及的基本接口:

  • Runnable和Callable:表示要执行的异步任务
  • Executor和ExecutorService:表示执行服务
  • Future:表示异步任务的结果

Runnable和Callable 关于Runnable和Callable,我们在前面几节都已经了解了,都表示任务,Runnable没有返回结果,而Callable有,Runnable不会抛出异常,而Callable会。

Executor和ExecutorService

Executor表示最简单的执行服务,其定义为:

public interface Executor {
    void execute(Runnable command);
}

就是可以执行一个Runnable,没有返回结果。接口没有限定任务如何执行,可能是创建一个新线程,可能是复用线程池中的某个线程,也可能是在调用者线程中执行。 ExecutorService扩展了Executor,定义了更多服务,基本方法有:

public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    //... 其他方法
}

这三个submit都表示提交一个任务,返回值类型都是Future,返回后,只是表示任务已提交,不代表已执行,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。我们知道,对于Callable,任务最终有个返回值,而对于Runnable是没有返回值的,第二个提交Runnable的方法可以同时提供一个结果,在异步任务结束时返回,而对于第三个方法,异步任务的最终返回值为null。

Future 我们来看Future接口的定义:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, 

        ExecutionException, TimeoutException;
}

get用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待,另一个get方法可以限定阻塞等待的时间,如果超时任务还未结束,会抛出TimeoutException。 cancel用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消,cancel返回false,否则返回true。如果任务还未开始,则不再运行。但如果任务已经在运行,则不一定能取消,参数mayInterruptIfRunning表示,如果任务正在执行,是否调用interrupt方法中断线程,如果为false就不会,如果为true,就会尝试中断线程,但我们从69节知道,中断不一定能取消线程。 isDone和isCancelled用于查询任务状态。isCancelled表示任务是否被取消,只要cancel方法返回了true,随后的isCancelled方法都会返回true,即使执行任务的线程还未真正结束。isDone表示任务是否结束,不管什么原因都算,可能是任务正常结束、可能是任务抛出了异常、也可能是任务被取消。 我们再来看下get方法,任务最终大概有三个结果:

  1. 正常完成,get方法会返回其执行结果,如果任务是Runnable且没有提供结果,返回null
  2. 任务执行抛出了异常,get方法会将异常包装为ExecutionException重新抛出,通过异常的getCause方法可以获取原异常
  3. 任务被取消了,get方法会抛出异常CancellationException

如果调用get方法的线程被中断了,get方法会抛出InterruptedException。 Future是一个重要的概念,是实现"任务的提交"与"任务的执行"相分离的关键,是其中的"纽带",任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作。 基本用法

基本示例

说了这么多接口,具体怎么用呢?我们看个简单的例子:

public class BasicDemo { static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { int sleepSeconds = new Random().nextInt(1000); Thread.sleep(sleepSeconds); return sleepSeconds; } } public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> future = executor.submit(new Task()); // 模拟执行其他任务 Thread.sleep(100); try { System.out.println(future.get()); } catch (ExecutionException e) { e.printStackTrace(); } executor.shutdown(); } }

我们使用了工厂类Executors创建了一个任务执行服务,Executors有多个静态方法,可以用来创建ExecutorService,这里使用的是:

public static ExecutorService newSingleThreadExecutor()

表示使用一个线程执行所有服务,后续我们会详细介绍Executors,注意与Executor相区别,后者是单数,是接口。 不管ExecutorService是如何创建的,对使用者而言,用法都一样,例子提交了一个任务,提交后,可以继续执行其他事情,随后可以通过Future获取最终结果或处理任务执行的异常。 最后,我们调用了ExecutorService的shutdown方法,它会关闭任务执行服务。

ExecutorService的更多方法

前面我们只是介绍了ExecutorService的三个submit方法,其实它还有如下方法:

public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

有两个关闭方法,shutdown和shutdownNow,区别是,shutdown表示不再接受新任务,但已提交的任务会继续执行,即使任务还未开始执行,shutdownNow不仅不接受新任务,已提交但尚未执行的任务会被终止,对于正在执行的任务,一般会调用线程的interrupt方法尝试中断,不过,线程可能不响应中断,shutdownNow会返回已提交但尚未执行的任务列表。 shutdown和shutdownNow不会阻塞等待,它们返回后不代表所有任务都已结束,不过isShutdown方法会返回true。调用者可以通过awaitTermination等待所有任务结束,它可以限定等待的时间,如果超时前所有任务都结束了,即isTerminated方法返回true,则返回true,否则返回false。 ExecutorService有两组批量提交任务的方法,invokeAll和invokeAny,它们都有两个版本,其中一个限定等待时间。 invokeAll等待所有任务完成,返回的Future列表中,每个Future的isDone方法都返回true,不过isDone为true不代表任务就执行成功了,可能是被取消了,invokeAll可以指定等待时间,如果超时后有的任务没完成,就会被取消。 而对于invokeAny,只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消,如果没有任务能在限时内成功返回,抛出TimeoutException,如果限时内所有任务都结束了,但都发生了异常,抛出ExecutionException。

ExecutorService的invokeAll示例

我们在64节介绍过使用jsoup下载和分析HTML,我们使用它看一个invokeAll的例子,同时下载并分析两个URL的标题,输出标题内容,代码为:

public class InvokeAllDemo { static class UrlTitleParser implements Callable<String> { private String url; public UrlTitleParser(String url) { this.url = url; } @Override public String call() throws Exception { Document doc = Jsoup.connect(url).get(); Elements elements = doc.select("head title"); if (elements.size() > 0) { return elements.get(0).text(); } return null; } } public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); String url1 = "http://www.cnblogs.com/swiftma/p/5396551.html"; String url2 = "http://www.cnblogs.com/swiftma/p/5399315.html"; Collection<UrlTitleParser> tasks = Arrays.asList(new UrlTitleParser[] { new UrlTitleParser(url1), new UrlTitleParser(url2) }); try { List<Future<String>> results = executor.invokeAll(tasks, 10, TimeUnit.SECONDS); for (Future<String> result : results) { try { System.out.println(result.get()); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); } }

这里,使用了Executors的另一个工厂方法newFixedThreadPool创建了一个线程池,这样使得多个任务可以并发执行,关于线程池,我们下节介绍。 其它代码比较简单,我们就不解释了。使用ExecutorService,编写并发异步任务的代码就像写顺序程序一样,不用关心线程的创建和协调,只需要提交任务、处理结果就可以了,大大简化了开发工作。 基本实现原理 了解了ExecutorService和Future的基本用法,我们来看下它们的基本实现原理。 ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,关于线程池我们下节再介绍。ExecutorService有一个抽象实现类AbstractExecutorService,本节,我们简要分析其原理,并基于它实现一个简单的ExecutorService,Future的主要实现类是FutureTask,我们也会简要探讨其原理。

AbstractExecutorService

AbstractExecutorService提供了submit, invokeAll和invokeAny的默认实现,子类只需要实现如下方法:

public void shutdown() public List<Runnable> shutdownNow() public boolean isShutdown() public boolean isTerminated() public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException public void execute(Runnable command)

除了execute,其他方法都与执行服务的生命周期管理有关,简化起见,我们忽略其实现,主要考虑execute。 submit/invokeAll/invokeAny最终都会调用execute,execute决定了到底如何执行任务,简化起见,我们为每个任务创建一个线程,一个完整的最简单的ExecutorService实现类如下:

public class SimpleExecutorService extends AbstractExecutorService { @Override public void shutdown() { } @Override public List<Runnable> shutdownNow() { return null; } @Override public boolean isShutdown() { return false; } @Override public boolean isTerminated() { return false; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return false; } @Override public void execute(Runnable command) { new Thread(command).start(); } }

对于前面的例子,创建ExecutorService的代码可以替换为:

ExecutorService executor = new SimpleExecutorService();

可以实现相同的效果。 ExecutorService最基本的方法是submit,它是如何实现的呢?我们来看AbstractExecutorService的代码:

public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }

它调用newTaskFor生成了一个RunnableFuture,RunnableFuture是一个接口,既扩展了Runnable,又扩展了Future,没有定义新方法,作为Runnable,它表示要执行的任务,传递给execute方法进行执行,作为Future,它又表示任务执行的异步结果。这可能令人混淆,我们来看具体代码:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }

就是创建了一个FutureTask对象,FutureTask实现了RunnableFuture接口。它是怎么实现的呢?

FutureTask 它有一个成员变量表示待执行的任务,声明为:

private Callable<V> callable;

有个整数变量state表示状态,声明为:

private volatile int state;

取值可能为:

NEW = 0; //刚开始的状态,或任务在运行 COMPLETING = 1; //临时状态,任务即将结束,在设置结果 NORMAL = 2; //任务正常执行完成 EXCEPTIONAL = 3; //任务执行抛出异常结束 CANCELLED = 4; //任务被取消 INTERRUPTING = 5; //任务在被中断 INTERRUPTED = 6; //任务被中断

有个变量表示最终的执行结果或异常,声明为:

private Object outcome;

有个变量表示运行任务的线程:

private volatile Thread runner;

还有个单向链表表示等待任务执行结果的线程:

private volatile WaitNode waiters;

FutureTask的构造方法会初始化callable和状态,如果FutureTask接受的是一个Runnable对象,它会调用Executors.callable转换为Callable对象,如下所示:

public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }

任务执行服务会使用一个线程执行FutureTask的run方法,run()代码为:

public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

其基本逻辑是:

  • 调用callable的call方法,捕获任何异常
  • 如果正常执行完成,调用set设置结果,保存到outcome
  • 如果执行过程发生异常,调用setException设置异常,异常也是保存到outcome,但状态不一样
  • set和setException除了设置结果,修改状态外,还会调用finishCompletion,它会唤醒所有等待结果的线程

对于任务提交者,它通过get方法获取结果,限时get方法的代码为:

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }

其基本逻辑是,如果任务还未执行完毕,就等待,最后调用report报告结果, report根据状态返回结果或抛出异常,代码为:

private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

cancel方法的代码为:

public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }

其基本逻辑为:

  • 如果任务已结束或取消,返回false
  • 如果mayInterruptIfRunning为true,调用interrupt中断线程,设置状态为INTERRUPTED
  • 如果mayInterruptIfRunning为false,设置状态为CANCELLED
  • 调用finishCompletion唤醒所有等待结果的线程

invokeAll和invokeAny 理解了FutureTask,我们再来看AbstractExecutorService的其他方法,invokeAll的基本逻辑很简单,对每个任务,创建一个FutureTask,并调用execute执行,然后等待所有任务结束。 invokeAny的实现稍微复杂些,它利用了ExecutorCompletionService,关于这个类及invokeAny的实现,我们后续章节再介绍。 小结 本节介绍了Java并发包中任务执行服务的基本概念和原理,该服务体现了并发异步开发中"关注点分离"的思想,使用者只需要通过ExecutorService提交任务,通过Future操作任务和结果即可,不需要关注线程创建和协调的细节。 本节主要介绍了AbstractExecutorService和FutureTask的基本原理,实现了一个最简单的执行服务SimpleExecutorService,对每个任务创建一个单独的线程。实际中,最经常使用的执行服务是基于线程池实现的ThreadPoolExecutor,线程池是并发程序中一个非常重要的概念和技术,让我们下一节来探讨。 (与其他章节一样,本节所有代码位于 https://github.com/swiftma/program-logic)

原文发布于微信公众号 - 老马说编程(laoma_shuo)

原文发表时间:2017-03-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Golang语言社区

理解Go语言Web编程(上)

断断续续学Go语言很久了,一直没有涉及Web编程方面的东西。因为仅是凭兴趣去学习的,时间有限,每次去学,也只是弄个一知半解。不过这两天下定决心把Go语言Web编...

392120
来自专栏chenssy

【死磕Java并发】-----J.U.C之线程池:线程池的基础架构

原文出处http://cmsblogs.com/ 『chenssy』 经历了Java内存模型、JUC基础之AQS、CAS、Lock、并发工具类、并发容器、阻塞队...

33350
来自专栏猿天地

用aop加redis实现通用接口缓存

系统在高并发场景下,最有用的三个方法是缓存,限流,降级。 缓存就是其中之一,目前缓存基本上是用redis或者memcached。 redis和memcached...

36570
来自专栏前端下午茶

JS 观察者模式

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时...

22910
来自专栏犀利豆的技术空间

徒手撸框架--实现 RPC 远程调用

微服务已经是每个互联网开发者必须掌握的一项技术。而 RPC 框架,是构成微服务最重要的组成部分之一。趁最近有时间。又看了看 dubbo 的源码。dubbo 为了...

16620
来自专栏IT杂记

通过Java程序提交通用Mapreduce无法回收类的问题

问题描述 上次发布的博客 通过Java程序提交通用Mapreduce,在实施过程中发现,每次提交一次Mapreduce任务,JVM无法回收过程中产生的MapRe...

31460
来自专栏Java呓语

单元测试以及JUnit框架解析

我们都有个习惯,常常不乐意去写个简单的单元测试程序来验证自己的代码。对自己的程序一直非常有自信,或存在侥幸心理每次运行通过后就直接扔给测试组测试了。然而每次测试...

17220
来自专栏算法修养

PAT 1015 Reversible Primes

1015. Reversible Primes (20) 时间限制 400 ms 内存限制 65536 kB 代码长度限制 16000 B...

364100
来自专栏青玉伏案

iOS逆向工程之Hopper中的ARM指令

虽然前段时间ARM被日本软银收购了,但是科技是无国界的,所以呢ARM相关知识该学的学。现在看ARM指令集还是倍感亲切的,毕竟大学里开了ARM这门课,并且做了不少...

34770
来自专栏FD的专栏

unlink漏洞的原理和利用

网上关于unlink漏洞的文章已经非常多了,但是作为一个web狗,为了搞明白这个漏洞,还是花了好长时间,中间踩了几个坑,写这篇文章是希望跟我一样啃二进制的web...

19120

扫码关注云+社区

领取腾讯云代金券