前文中我们讲述了创建线程的2种方式:直接继承Thread和实现Runnable接口,但这两种方式在执行完任务之后都无法获取执行结果。 自从Java 5开始,JDK提供了Callable和Future,解决了上述问题,通过它们可以在任务执行完毕之后得到任务执行结果。
Future类位于java.util.concurrent包下,它是一个接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在Future接口中声明了5个方法:
设想,有这样一个场景,同时启动3个线程分别执行一个任务,线程1耗时8s,线程2耗时7s,线程3耗时6s。用Future去接收线程执行结果,并手动维护一个List放置所有Future,代码如下:
import java.util.ArrayList;
import java.util.List;
import java.util.Date;
import java.util.concurrent.*;
/**
* @author guozhengMu
* @version 1.0
* @date 2019/11/7 20:54
* @description
* @modify
*/
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(3);
class Task implements Callable<String> {
private int time;
public Task(int time) {
this.time = time;
}
@Override
public String call() throws Exception {
String name = Thread.currentThread().getName();
System.out.println(name + "启动:" + new Date());
TimeUnit.SECONDS.sleep(time);
return name;
}
}
List<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Future<String> future = executor.submit(new Task(8 - i));
results.add(future);
}
for (int i = 0; i < 3; i++) {
System.out.println(results.get(i).get() + "完成:" + new Date());
}
System.out.println("全部线程执行完毕");
executor.shutdownNow();
}
}
输出结果:
pool-1-thread-1启动:Fri Nov 08 13:42:08 CST 2019
pool-1-thread-3启动:Fri Nov 08 13:42:08 CST 2019
pool-1-thread-2启动:Fri Nov 08 13:42:08 CST 2019
pool-1-thread-1完成:Fri Nov 08 13:42:16 CST 2019
pool-1-thread-2完成:Fri Nov 08 13:42:16 CST 2019
pool-1-thread-3完成:Fri Nov 08 13:42:16 CST 2019
全部线程执行完毕
可以看到,get方法具有阻塞性,线程1 的结果未返回前,其他已经完成的线程任务结果也无法获取。下面对上面代码进行改进:
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Date;
import java.util.concurrent.*;
/**
* @author guozhengMu
* @version 1.0
* @date 2019/11/7 20:54
* @description
* @modify
*/
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(3);
class Task implements Callable<String> {
private int time;
public Task(int time) {
this.time = time;
}
@Override
public String call() throws Exception {
String name = Thread.currentThread().getName();
System.out.println(name + "启动:" + new Date());
TimeUnit.SECONDS.sleep(time);
return name;
}
}
List<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Future<String> future = executor.submit(new Task(8 - i));
results.add(future);
}
boolean flag = true;
while (flag) {
for (Iterator<Future<String>> iter = results.iterator(); iter.hasNext(); ) {
Future<String> future = iter.next();
if (future.isDone()) {
System.out.println(future.get() + "完成:" + new Date());
iter.remove();
}
}
if (results.size() == 0) {
flag = false;
}
}
System.out.println("全部线程执行完毕");
executor.shutdownNow();
}
}
输出结果:
pool-1-thread-2启动:Fri Nov 08 14:12:43 CST 2019
pool-1-thread-1启动:Fri Nov 08 14:12:43 CST 2019
pool-1-thread-3启动:Fri Nov 08 14:12:43 CST 2019
pool-1-thread-3完成:Fri Nov 08 14:12:49 CST 2019
pool-1-thread-2完成:Fri Nov 08 14:12:50 CST 2019
pool-1-thread-1完成:Fri Nov 08 14:12:51 CST 2019
全部线程执行完毕
可以看到,一旦某个线程任务执行结束,其结果能被立即获取到,但代价是程序在不停地循环查询线程任务 isDone 的结果,对cpu消耗比较大。因此,使用Future解决多任务结果,并不是最优的效果。 FutureTask正是为此而存在
FutureTask类实现了RunnableFuture接口:
public class FutureTask<V> implements RunnableFuture<V>
RunnableFuture接口又继承了Runable和Future
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
可见,FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。FutureTask类图如下:
下面我们再来看看 FutureTask 工具类。前面我们提到的 Future 是一个接口,而 FutureTask 是一个工具类,这个工具类有两个构造函数:
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);
使用Callable+FutureTask获取执行结果:
import java.util.concurrent.*;
/**
* @author guozhengMu
* @version 1.0
* @date 2019/11/8 14:17
* @description
* @modify
*/
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask("hello,", "world-");
//将任务放进FutureTask里
FutureTask<Object> futureTask = new FutureTask<>(myTask);
//采用thread来开启多线程
Thread thread = new Thread(futureTask);
thread.start();
try {
System.out.println(futureTask.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyTask implements Callable<Object> {
private String param1;
private String param2;
//构造函数,用来向task中传递任务的参数
public MyTask(String param1, String param2) {
this.param1 = param1;
this.param2 = param2;
}
//任务执行的动作
@Override
public String call() {
for (int i = 0; i < 5; i++) {
System.out.println(param1 + param2 + i);
}
return "运行完成!";
}
}
输出结果:
hello,world-0
hello,world-1
hello,world-2
hello,world-3
hello,world-4
运行完成!
也可以使用线程池:
import java.util.concurrent.*;
/**
* @author guozhengMu
* @version 1.0
* @date 2019/11/8 14:17
* @description
* @modify
*/
public class FutureTaskTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(() -> 1 + 2);
// 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 提交 FutureTask
executor.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
System.out.println(result);
executor.shutdown();
}
}
接下来,我们使用FutureTask来实现Future多线程获取任务结果的场景:
import java.util.Date;
import java.util.concurrent.*;
/**
* @author guozhengMu
* @version 1.0
* @date 2019/11/8 14:17
* @description
* @modify
*/
public class FutureTaskTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 3; i++) {
Callable<String> callable = new Task(8 - i);
MyFutureTask task = new MyFutureTask(callable);
executor.submit(task);
}
executor.shutdown();
}
}
class MyFutureTask extends FutureTask<String> {
public MyFutureTask(Callable<String> callable) {
super(callable);
}
@Override
protected void done() {
try {
System.out.println(get() + "完成:" + new Date());
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class Task implements Callable<String> {
private int time;
public Task(int time) {
this.time = time;
}
@Override
public String call() throws InterruptedException {
String name = Thread.currentThread().getName();
System.out.println(name + "启动:" + new Date());
TimeUnit.SECONDS.sleep(time);
return name;
}
}
输出结果:
pool-1-thread-1启动:Fri Nov 08 17:35:26 CST 2019
pool-1-thread-3启动:Fri Nov 08 17:35:26 CST 2019
pool-1-thread-2启动:Fri Nov 08 17:35:26 CST 2019
pool-1-thread-3完成:Fri Nov 08 17:35:32 CST 2019
pool-1-thread-2完成:Fri Nov 08 17:35:33 CST 2019
pool-1-thread-1完成:Fri Nov 08 17:35:34 CST 2019