当要使用线程去执行一个任务时,可以使用ExecutorService.submit(new Callable());
这样可以不影响其他的业务的执行,异步的执行我们想要的任务;
以下面是一个简单的接口为例:
/**
* JDK 线程测试
*/
@RestController
public class JdkThreadController { @RequestMapping("/test/jdk")
public void execute() throws ExecutionException, InterruptedException {
// 固定大小的线程池 核心线程数和最大线程数=10
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 记录开始时间
Long start = System.currentTimeMillis();
// 一个耗时的任务
Future<Boolean> future = executorService
.submit(new Callable<Boolean>() {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public Boolean call() throws Exception {
//模拟耗时5s
Thread.sleep(5000);
return true;
}
}); // 阻塞 等待执行结果
Boolean result = future.get();
//打印结果
System.out.println("任务执行成功了,执行结果=" + result);
// 记录结束时间
Long end = System.currentTimeMillis();
// 执行时间
System.out.println("线程执行结束了,耗时=" + (end - start) + "毫秒");
System.out.println("-----------------------华丽的分割线
-----------------------");
}
}但是当有多个任务提交到线程池去执行的情况下,会有多个Future调用get()获取执行结果的时候,会造成多个future()的get串行的场景。
如果多个get(),每个get()阻塞很久,接口性能也就会受到影响。
Guava为Java并行编程Future提供了很多有用扩展,其主要接口为ListenableFuture,并借助于Futures静态扩展。
继承自Future的ListenableFuture,允许我们添加回调函数在线程运算完成时返回值或者方法执行完成立即返回。
如果Future带有回调,这样是不是可以避免我们自己直接操作get()获取返回值,直接帮我们执行一些后续的工作?
依赖的Guava包:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>25.1-jre</version>
</dependency>ListenableFuture初探:
@GwtCompatible
public interface ListenableFuture<V> extends Future<V> {
void addListener(Runnable listener, Executor executor);
}ListenableFuture集成及JDK的Future。增加了一个添加监听器的接口。
ListenableFuture简单操作:
(1)用Guava的修饰过JDK线程池的MoreExecutors类创建线程池
/**
* 公用的线程池
*/
@Component
public class ListeningExecutors { @Bean
public ListeningExecutorService createListeningExecutorService() {
// 创建线程池
ListeningExecutorService listeningExecutorService = MoreExecutors.
listeningDecorator(Executors.newFixedThreadPool(10)); return listeningExecutorService;
}
}(2)创建一个回调任务,这个任务就是通过上面的ListenableFuture的addListener方法注册上的
/**
* ListenableFuture回调任务
*/
public class FutureCallBackTask implements FutureCallback<Boolean> { /**
* 成功的回调
* @param result
*/
@Override
public void onSuccess(Boolean result) {
//执行回调函数
System.out.println("进入回调函数");
//得到任务执行的结果
System.out.println("任务执行成功了,执行结果=" + result);
} /**
* 失败的回调
* @param t
*/
@Override
public void onFailure(Throwable t) {
System.out.println("出错了");
}
}(3)用ListenableFuture扩展JDK的Future,实现异步的回调
/**
* Guava 线程测试
*/
@RestController
public class GuavaThreadController { @Autowired
private ListeningExecutorService listeningExecutorService; @RequestMapping("/test/guava")
public void execute() {
// 记录开始时间
Long start = System.currentTimeMillis();
// 一个耗时的任务
ListenableFuture<Boolean> listenableFuture =
listeningExecutorService.submit(() -> {
//模拟耗时5s
Thread.sleep(5000);
return true;
});
// 注册回调事件
Futures.addCallback(listenableFuture, new FutureCallBackTask(),
listeningExecutorService);
// 记录结束时间
Long end = System.currentTimeMillis();
// 执行时间
System.out.println("线程执行结束了,耗时=" + (end - start) + "毫秒");
}
}改造过的接口,有哪些改善呢?
get()阻塞式的调用省去了。接口性能更高,耗时更短。
原本需要根据JDK的future返回结果的操作,放在回调函数中做了。整个接口再无阻塞。
transform:对于ListenableFuture的返回值进行转换。
allAsList:对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
successfulAsList:和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
immediateFuture/immediateCancelledFuture: 立即返回一个待返回值的ListenableFuture。
makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture 。其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。
@RestController
public class SeniorListenableFutureController { @Autowired
private ListeningExecutorService listeningExecutorService; @RequestMapping("/test/senior")
public void execute() { long start = System.currentTimeMillis(); // 任务1
ListenableFuture future1 = listeningExecutorService
.submit(() -> {
Thread.sleep(5000);
System.out.printf("调用第1个future,执行时间是%d%n",
System.currentTimeMillis());
return 1;
}); // 任务2
ListenableFuture future2 = listeningExecutorService
.submit(() -> {
Thread.sleep(10000);
System.out.printf("调用第2个future,执行时间是%d%n",
System.currentTimeMillis());
throw new RuntimeException("任务2出现了异常");
// return 2;
}); //对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个
//Future返回值组成的List对象。
// 注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
final ListenableFuture allFutures = Futures
.allAsList(future1, future2); //对于多个ListenableFuture的进行转换,返回一个新的ListenableFuture
final ListenableFuture transform = Futures.
transformAsync(allFutures, new AsyncFunction<List<Integer>, String>() {
/**
* 用给定的输入封装一个特定的ListenableFuture作为输出
*
* @param input
*/
@Override
public ListenableFuture<String>
apply(@Nullable List<Integer> input) {
// 立即返回一个待返回值的ListenableFuture
// 这里可以对input进行复杂的处理,返回最终的一个结果
//比如:对团单详情,团单优惠,团单使用范围进行组装
return Futures.immediateFuture(String
.format("执行成功的任务的数量是:%d", input.size()));
}
}, listeningExecutorService); // 注册回调事件
Futures.addCallback(transform, new FutureCallback<Object>() { public void onSuccess(Object result) {
System.out.println("进入正确的回调函数");
System.out.printf("任务执行的结果是:%s%n", result);
} public void onFailure(Throwable thrown) {
System.out.println("进入错误的回调函数");
System.out.printf("系统出错了,错误原因是:%s%n",
thrown.getMessage());
}
}, listeningExecutorService); long end = System.currentTimeMillis();
System.out.printf("接口总耗时%d毫秒%n", end - start);
}
}Guava高级特性——JdkFutureAdapters.listenInPoolThread(future)