技术点
Reactive
Reactive是一种编程方式,由不同的方式来实现
阻塞的弊端和并行的复杂
在Reactor官方的网站上,指出了现有编程的一些不足https://projectreactor.io/docs/core/release/reference/index.html#_blocking_can_be_wasteful
Reactor认为阻塞可能是浪费的
归纳
阻塞的弊端
由以下场景来说明
public class DataLoader {
public final void load() {
long startTime = System.currentTimeMillis();
doLoad();
long costTime = System.currentTimeMillis() - startTime;
System.out.println("load()总耗时:" + costTime + "毫秒");
}
protected void doLoad() {
loadConfigurations();
loadUsers();
loadOrders();
}
protected final void loadConfigurations() {
loadMock("loadConfigurations()",1);
}
protected final void loadUsers() {
loadMock("loadUsers",2);
}
protected final void loadOrders() {
loadMock("loadOrders()",3);
}
private void loadMock(String source,int seconds) {
try {
long startTime = System.currentTimeMillis();
long milliseconds = TimeUnit.SECONDS.toMillis(seconds);
Thread.sleep(milliseconds);
long costTime = System.currentTimeMillis() - startTime;
System.out.printf("[线程: %s] %s 耗时: %d 毫秒\n",
Thread.currentThread().getName(),source,costTime );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new DataLoader().load();
}
}
运行结果
线程: main loadConfigurations() 耗时: 1001 毫秒
线程: main loadUsers 耗时: 2001 毫秒
线程: main loadOrders() 耗时: 3003 毫秒
load()总耗时:6025毫秒
由结果可知,我们在依次执行loadConfigurations()、loadUsers()、loadOrders()中,loadUsers()被loadConfigurations()阻塞了,loadOrders() 被loadUsers()阻塞了,它们都是main的主线程中的执行。由于加载过程串行执行的关系,导致消耗实现线性累加。串行执行即Blocking模式。
并行的复杂
由以下场景来说明
public class ParalleDataLoader extends DataLoader {
protected void doLoad() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
//CompletionService是一个接口,ExecutorCompletionService为其实现类
//ExecutorCompletionService在构造函数中会创建一个BlockingQueue
// (使用的基于链表的无界队列LinkedBlockingQueue),
// 该BlockingQueue的作用是保存Executor执行的结果。
// 当计算完成时,调用FutureTask的done方法。
// 当提交一个任务到ExecutorCompletionService时,
// 首先将任务包装成QueueingFuture,它是FutureTask的一个子类,
// 然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
CompletionService completionService = new ExecutorCompletionService(executorService);
completionService.submit(super::loadConfigurations,null);
completionService.submit(super::loadUsers,null);
completionService.submit(super::loadOrders,null);
int count = 0;
while (count < 3) {
if (completionService.poll() != null) {
count++;
}
}
executorService.shutdown();
}
public static void main(String[] args) {
new ParalleDataLoader().load();
}
}
这里大概解释一下ExecutorCompletionService,它的构造器会初始化一个线程池以及一个BlockingQueue
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
提交线程的时候,会初始化一个FutureTask,并放入QueueingFuture中,交给线程池去执行。
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
我们看一下QueueingFuture的继承图
由图可知,无论QueueingFuture,FutureTask,RunnableFuture其实都是一个Runnable。而在线程执行完毕后会执行一个done()方法,将结果放入BlockingQueue中。
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
private final BlockingQueue<Future<V>> completionQueue;
BlockingQueue是在ExecutorCompletionService被初始化了的,有关BlockingQueue的介绍可以参考从BlockingQueue到无锁Disruptor的性能提升
最后我们用到了completionService.poll()
public Future<V> poll() {
return completionQueue.poll();
}
将Future结果从BlockingQueue队列中弹出。当然我们示例中并没有什么结果需要弹出。
现在我们回到示例代码,运行结果
线程: pool-1-thread-1 loadConfigurations() 耗时: 1002 毫秒
线程: pool-1-thread-2 loadUsers 耗时: 2002 毫秒
线程: pool-1-thread-3 loadOrders() 耗时: 3003 毫秒
load()总耗时:3059毫秒
由结果可知,程序改造为并行加载后,性能和资源利用率得到提升,消耗时间取最大者。但由于以上三个方法之间没有数据依赖关系,所以执行方式由串行调整为并行后,能够达到性能提升的效果。如果方法之间存在依赖关系时,那么提升效果是否还会如此明显,并且如果确保它们的执行循序。问题如(线程安全性,原子性,可见性),由此问题可以参考Fork/Join框架原理和使用探秘 ,在这篇博客中就可以看到为了保证线程安全性,性能已经不如单线程。
Reactor认为异步不一定能够救赎
归纳
Callback Hell
我们来看这样一段代码
public class JavaGUI {
public static void main(String[] args) {
final JFrame jFrame = new JFrame("GUI 示例");
jFrame.setBounds(500,300,400,300);
LayoutManager layoutManager = new BorderLayout(400,300);
jFrame.setLayout(layoutManager);
jFrame.addMouseListener(new MouseAdapter() {
@Override
public void mouseClicked(MouseEvent e) {
System.out.printf("[线程 : %s] 鼠标点击,坐标(X : %d,Y : %d)\n",
currentThreadName(),e.getX(),e.getY());
}
});
jFrame.addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent e) {
System.out.printf("[线程 : %s] 清除 jFrame...\n",currentThreadName());
jFrame.dispose();
}
@Override
public void windowClosed(WindowEvent e) {
System.out.printf("[线程 : %s] 退出程序... \n",currentThreadName());
System.exit(0);
}
});
System.out.println("当前线程:" + currentThreadName());
jFrame.setVisible(true);
}
private static String currentThreadName() {
return Thread.currentThread().getName();
}
}
当我们执行了main方法以后,会打印当前线程,并且显示window窗体。
我们可以看到打印了当前线程为main的主线程。当我们在窗体内用鼠标点击的时候会打印如下内容
线程 : AWT-EventQueue-0 鼠标点击,坐标(X : 218,Y : 167)
线程 : AWT-EventQueue-0 鼠标点击,坐标(X : 130,Y : 120)
由打印的内容可知,我们鼠标点击并不是main的主线程来执行的,说明它是一个异步的Callback,而且是非阻塞的,当我们点击鼠标产生鼠标事件时,没有任何线程会阻塞该线程的执行。当我们关闭窗口的时候,会打印如下内容
线程 : AWT-EventQueue-0 清除 jFrame...
线程 : AWT-EventQueue-0 退出程序...
说明关闭也是由同一个异步线程来执行的。由此可以看出Java GUI以及事件/监听模式基本采用匿名内置类,即回调实现。当监听的维度增多,Callback实现也随之增多。同时,事件/监听者模式(观察者模式)的并发模型可为同步或异步。这里说的同步、异步是线程模型;阻塞、非阻塞是编程模型。在Spring中,于这种GUI回调类似的有Spring Boot的消息事件机制 ,这里面也有同步,异步,阻塞,非阻塞的说明。
Future阻塞问题
我们来修改一下ParalleDataLoader的代码形成一个Future的阻塞
public class FutureBlockingDataLoader extends DataLoader {
protected void doLoad() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService completionService = new ExecutorCompletionService(executorService);
runComletely(completionService.submit(super::loadConfigurations,null));
runComletely(completionService.submit(super::loadUsers,null));
runComletely(completionService.submit(super::loadOrders,null));
executorService.shutdown();
}
private void runComletely(Future<?> future) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new FutureBlockingDataLoader().load();
}
}
运行结果
线程: pool-1-thread-1 loadConfigurations() 耗时: 1000 毫秒
线程: pool-1-thread-2 loadUsers 耗时: 2002 毫秒
线程: pool-1-thread-3 loadOrders() 耗时: 3001 毫秒
load()总耗时:6073毫秒
由结果可知,future.get()成为future阻塞的源泉。该方法不得不等待任务执行完成,换言之,如果多个任务提交后,返回多个Future逐一调用get()方法时,将会依次blocking,任务的执行从并行变成串行。
Future链式问题
由于Future无法异步执行结果链式处理,尽管FutureBlockingDataLoader能够解决方法数据依赖以及顺序执行的问题,不过它将并行执行带回了阻塞(串行)执行。所以,它不是一个理想实现。不过CompletableFuture可以帮助提升Future限制。
public class ChainDataLoader extends DataLoader {
protected void doLoad() {
CompletableFuture
.runAsync(super::loadConfigurations)
.thenRun(super::loadUsers)
.thenRun(super::loadOrders)
.whenComplete((result,throwable) ->
System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成")
).join();
}
public static void main(String[] args) {
new ChainDataLoader().load();
}
}
运行结果
线程: ForkJoinPool.commonPool-worker-9 loadConfigurations() 耗时: 1004 毫秒
线程: ForkJoinPool.commonPool-worker-9 loadUsers 耗时: 2004 毫秒
线程: ForkJoinPool.commonPool-worker-9 loadOrders() 耗时: 3001 毫秒
线程 :ForkJoinPool.commonPool-worker-9 加载完成
load()总耗时:6079毫秒
由结果可知,当异步执行时,它并不是由3个线程去执行,而是由同一个线程进行链式执行的,之所以加入join,是为了让主线程等待返回。它跟第一个DataLoader的不同在于,DataLoader是全部由主线程去阻塞执行的,而这里如果不使用join()则肯定为非阻塞的,只不过join()会阻塞,这个是线程相关的常识,具体可以参考线程,JVM锁整理 。也就是说,如果去掉join(),由于CompletableFuture都是守护线程,主线程执行完,它是不会执行的,现在我们把代码稍作修改如下。
public class ChainDataLoader extends DataLoader {
protected void doLoad() {
CompletableFuture
.runAsync(super::loadConfigurations)
.thenRun(super::loadUsers)
.thenRun(super::loadOrders)
.whenComplete((result,throwable) ->
System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成")
);
System.out.println("[线程 :" + Thread.currentThread().getName() +"】后续执行");
}
public static void main(String[] args) {
new ChainDataLoader().load();
}
}
运行结果
[线程 :main】后续执行
load()总耗时:60毫秒
证明CompletableFuture还未启动,并未执行。但如果我们把new ChainDataLoader().load();这段代码放入Controller中
@RestController
public class TestController {
@GetMapping("/future")
public void findfuture() {
new ChainDataLoader().load();
}
}
通过浏览器访问
可以看到后台打印
[线程 :reactor-http-nio-2】后续执行
load()总耗时:3毫秒
线程: ForkJoinPool.commonPool-worker-9 loadConfigurations() 耗时: 1003 毫秒
线程: ForkJoinPool.commonPool-worker-9 loadUsers 耗时: 2004 毫秒
线程: ForkJoinPool.commonPool-worker-9 loadOrders() 耗时: 3005 毫秒
线程 :ForkJoinPool.commonPool-worker-9 加载完成
证明异步线程是非阻塞并且执行了的。
这里我们可以看到CompletableFuture属于异步操作,如果强制等待结束的话,又回到了阻塞编程的方式,并且让我们明白到非阻塞不一定提升性能,因为即便是非阻塞,在异步线程中,它一样要使用6秒才能完成,相比于ParalleDataLoader的并行执行,只需要3秒完成来说,非阻塞的好处是让主方法线程及时完成,让主方法线程池可以及时释放。不过同理,在ParalleDataLoader中如果不进行completionService.poll()的阻塞操作,主线程同样会率先返回,由于线程池中的线程并非守护线程,它在主线程完成后会继续执行。
public class ParalleDataLoader extends DataLoader {
protected void doLoad() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService completionService = new ExecutorCompletionService(executorService);
completionService.submit(super::loadConfigurations,null);
completionService.submit(super::loadUsers,null);
completionService.submit(super::loadOrders,null);
// int count = 0;
// while (count < 3) {
// if (completionService.poll() != null) {
// count++;
// }
// }
executorService.shutdown();
}
public static void main(String[] args) {
new ParalleDataLoader().load();
}
}
运行结果
load()总耗时:59毫秒
线程: pool-1-thread-1 loadConfigurations() 耗时: 1004 毫秒
线程: pool-1-thread-2 loadUsers 耗时: 2004 毫秒
线程: pool-1-thread-3 loadOrders() 耗时: 3002 毫秒
同样我们把new ParalleDataLoader().load()放入Controller中
@RestController
public class TestController {
@GetMapping("/future")
public void findfuture() {
new ParalleDataLoader().load();
}
}
通过浏览器访问,后台打印
load()总耗时:1毫秒
线程: pool-1-thread-1 loadConfigurations() 耗时: 1000 毫秒
线程: pool-1-thread-2 loadUsers 耗时: 2004 毫秒
线程: pool-1-thread-3 loadOrders() 耗时: 3005 毫秒
这里同样为异步非阻塞,并且并发了3个线程,异步线程总耗时是3秒。但是这样会造成异步线程池的线程数并发量比较大。
Reactive Stream JVM认为异步系统和资源消费需要特殊处理,在Reactor的github的官网上,有这样一段描述https://github.com/reactive-streams/reactive-streams-jvm
Reactive的理解,需要从很多方便
Reactive Programming定义
The Reactive Mainifesto认为:官网https://www.reactivemanifesto.org/
侧重点