前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Reactor深度探索 顶

Reactor深度探索 顶

作者头像
算法之名
发布2020-02-11 16:44:27
8920
发布2020-02-11 16:44:27
举报
文章被收录于专栏:算法之名算法之名

技术点

  • 反应器模式(Reactor) 同步非阻塞,每个事情可以分为几个步骤,每个步骤可以相应去做,每个步骤是不会相互影响的,但是做起来是串行的。有关Netty的具体实现,可以参考Netty整理
  • Proactor模式 异步非阻塞,每个事情同时做,或者说是异步的去做,
  • 观察者模式(Observer) JDK的实现可以参考使用JDK的观察者接口进行消息推送 观察者模式是一个推的模式
  • 迭代器模式(Iterator) 是一种拉的模式,数据准备好后,进行一个循环拉取。
  • Java并发模型

Reactive

Reactive是一种编程方式,由不同的方式来实现

  • RxJava : Reactive Extensions
  • Reactor : Spring WebFlux Reactive类库
  • Flow API : Java 9 Flow API实现

阻塞的弊端和并行的复杂

在Reactor官方的网站上,指出了现有编程的一些不足https://projectreactor.io/docs/core/release/reference/index.html#_blocking_can_be_wasteful

Reactor认为阻塞可能是浪费的

归纳

  • 阻塞导致性能瓶颈和浪费资源
  • 增加线程可能会引起资源竞争和并发问题(可见性问题,原子性问题)
  • 并行的方式不是银弹(不能解决所有问题)

阻塞的弊端

由以下场景来说明

代码语言:javascript
复制
public class DataLoader {
    public final void load() {
        long startTime = System.currentTimeMillis();
        doLoad();
        long costTime = System.currentTimeMillis() - startTime;
        System.out.println("load()总耗时:" + costTime + "毫秒");
    }

    protected void doLoad() {
        loadConfigurations();
        loadUsers();
        loadOrders();
    }

    protected final void loadConfigurations() {
        loadMock("loadConfigurations()",1);
    }

    protected final void loadUsers() {
        loadMock("loadUsers",2);
    }

    protected final void loadOrders() {
        loadMock("loadOrders()",3);
    }

    private void loadMock(String source,int seconds) {
        try {
            long startTime = System.currentTimeMillis();
            long milliseconds = TimeUnit.SECONDS.toMillis(seconds);
            Thread.sleep(milliseconds);
            long costTime = System.currentTimeMillis() - startTime;
            System.out.printf("[线程: %s] %s 耗时: %d 毫秒\n",
                    Thread.currentThread().getName(),source,costTime );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new DataLoader().load();
    }
}

运行结果

线程: main loadConfigurations() 耗时: 1001 毫秒

线程: main loadUsers 耗时: 2001 毫秒

线程: main loadOrders() 耗时: 3003 毫秒

load()总耗时:6025毫秒

由结果可知,我们在依次执行loadConfigurations()、loadUsers()、loadOrders()中,loadUsers()被loadConfigurations()阻塞了,loadOrders() 被loadUsers()阻塞了,它们都是main的主线程中的执行。由于加载过程串行执行的关系,导致消耗实现线性累加。串行执行即Blocking模式。

并行的复杂

由以下场景来说明

代码语言:javascript
复制
public class ParalleDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //CompletionService是一个接口,ExecutorCompletionService为其实现类
        //ExecutorCompletionService在构造函数中会创建一个BlockingQueue
        // (使用的基于链表的无界队列LinkedBlockingQueue),
        // 该BlockingQueue的作用是保存Executor执行的结果。
        // 当计算完成时,调用FutureTask的done方法。
        // 当提交一个任务到ExecutorCompletionService时,
        // 首先将任务包装成QueueingFuture,它是FutureTask的一个子类,
        // 然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
        CompletionService completionService = new ExecutorCompletionService(executorService);
        completionService.submit(super::loadConfigurations,null);
        completionService.submit(super::loadUsers,null);
        completionService.submit(super::loadOrders,null);

        int count = 0;
        while (count < 3) {
            if (completionService.poll() != null) {
                count++;
            }
        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
        new ParalleDataLoader().load();
    }
}

这里大概解释一下ExecutorCompletionService,它的构造器会初始化一个线程池以及一个BlockingQueue

代码语言:javascript
复制
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

提交线程的时候,会初始化一个FutureTask,并放入QueueingFuture中,交给线程池去执行。

代码语言:javascript
复制
public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}
代码语言:javascript
复制
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    if (aes == null)
        return new FutureTask<V>(task, result);
    else
        return aes.newTaskFor(task, result);
}

我们看一下QueueingFuture的继承图

由图可知,无论QueueingFuture,FutureTask,RunnableFuture其实都是一个Runnable。而在线程执行完毕后会执行一个done()方法,将结果放入BlockingQueue中。

代码语言:javascript
复制
private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
代码语言:javascript
复制
private final BlockingQueue<Future<V>> completionQueue;

BlockingQueue是在ExecutorCompletionService被初始化了的,有关BlockingQueue的介绍可以参考从BlockingQueue到无锁Disruptor的性能提升

最后我们用到了completionService.poll()

代码语言:javascript
复制
public Future<V> poll() {
    return completionQueue.poll();
}

将Future结果从BlockingQueue队列中弹出。当然我们示例中并没有什么结果需要弹出。

现在我们回到示例代码,运行结果

线程: pool-1-thread-1 loadConfigurations() 耗时: 1002 毫秒

线程: pool-1-thread-2 loadUsers 耗时: 2002 毫秒

线程: pool-1-thread-3 loadOrders() 耗时: 3003 毫秒

load()总耗时:3059毫秒

由结果可知,程序改造为并行加载后,性能和资源利用率得到提升,消耗时间取最大者。但由于以上三个方法之间没有数据依赖关系,所以执行方式由串行调整为并行后,能够达到性能提升的效果。如果方法之间存在依赖关系时,那么提升效果是否还会如此明显,并且如果确保它们的执行循序。问题如(线程安全性,原子性,可见性),由此问题可以参考Fork/Join框架原理和使用探秘 ,在这篇博客中就可以看到为了保证线程安全性,性能已经不如单线程。

Reactor认为异步不一定能够救赎

归纳

  • Callbacks是解决非阻塞的方案,然后它们之间很难组合,并且快速地将代码引导至"Callback Hell"的不归路
  • Futures相对于Callbacks好一点,不过还是无法组合,不过ComletableFuture能够提升这方面的不足。比如在上面的示例中,如果loadUsers要传递数据到loadOrders中也是极其困难的。

Callback Hell

我们来看这样一段代码

代码语言:javascript
复制
public class JavaGUI {
    public static void main(String[] args) {
        final JFrame jFrame = new JFrame("GUI 示例");
        jFrame.setBounds(500,300,400,300);
        LayoutManager layoutManager = new BorderLayout(400,300);
        jFrame.setLayout(layoutManager);
        jFrame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                System.out.printf("[线程 : %s] 鼠标点击,坐标(X : %d,Y : %d)\n",
                        currentThreadName(),e.getX(),e.getY());
            }
        });
        jFrame.addWindowListener(new WindowAdapter() {
            @Override
            public void windowClosing(WindowEvent e) {
                System.out.printf("[线程 : %s] 清除 jFrame...\n",currentThreadName());
                jFrame.dispose();
            }

            @Override
            public void windowClosed(WindowEvent e) {
                System.out.printf("[线程 : %s] 退出程序... \n",currentThreadName());
                System.exit(0);
            }
        });
        System.out.println("当前线程:" + currentThreadName());
        jFrame.setVisible(true);
    }

    private static String currentThreadName() {
        return Thread.currentThread().getName();
    }
}

当我们执行了main方法以后,会打印当前线程,并且显示window窗体。

我们可以看到打印了当前线程为main的主线程。当我们在窗体内用鼠标点击的时候会打印如下内容

线程 : AWT-EventQueue-0 鼠标点击,坐标(X : 218,Y : 167)

线程 : AWT-EventQueue-0 鼠标点击,坐标(X : 130,Y : 120)

由打印的内容可知,我们鼠标点击并不是main的主线程来执行的,说明它是一个异步的Callback,而且是非阻塞的,当我们点击鼠标产生鼠标事件时,没有任何线程会阻塞该线程的执行。当我们关闭窗口的时候,会打印如下内容

线程 : AWT-EventQueue-0 清除 jFrame...

线程 : AWT-EventQueue-0 退出程序...

说明关闭也是由同一个异步线程来执行的。由此可以看出Java GUI以及事件/监听模式基本采用匿名内置类,即回调实现。当监听的维度增多,Callback实现也随之增多。同时,事件/监听者模式(观察者模式)的并发模型可为同步或异步。这里说的同步、异步是线程模型;阻塞、非阻塞是编程模型。在Spring中,于这种GUI回调类似的有Spring Boot的消息事件机制 ,这里面也有同步,异步,阻塞,非阻塞的说明。

Future阻塞问题

我们来修改一下ParalleDataLoader的代码形成一个Future的阻塞

代码语言:javascript
复制
public class FutureBlockingDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService completionService = new ExecutorCompletionService(executorService);
        runComletely(completionService.submit(super::loadConfigurations,null));
        runComletely(completionService.submit(super::loadUsers,null));
        runComletely(completionService.submit(super::loadOrders,null));
        executorService.shutdown();
    }

    private void runComletely(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new FutureBlockingDataLoader().load();
    }
}

运行结果

线程: pool-1-thread-1 loadConfigurations() 耗时: 1000 毫秒

线程: pool-1-thread-2 loadUsers 耗时: 2002 毫秒

线程: pool-1-thread-3 loadOrders() 耗时: 3001 毫秒

load()总耗时:6073毫秒

由结果可知,future.get()成为future阻塞的源泉。该方法不得不等待任务执行完成,换言之,如果多个任务提交后,返回多个Future逐一调用get()方法时,将会依次blocking,任务的执行从并行变成串行。

Future链式问题

由于Future无法异步执行结果链式处理,尽管FutureBlockingDataLoader能够解决方法数据依赖以及顺序执行的问题,不过它将并行执行带回了阻塞(串行)执行。所以,它不是一个理想实现。不过CompletableFuture可以帮助提升Future限制。

代码语言:javascript
复制
public class ChainDataLoader extends DataLoader {
    protected void doLoad() {
        CompletableFuture
                .runAsync(super::loadConfigurations)
                .thenRun(super::loadUsers)
                .thenRun(super::loadOrders)
                .whenComplete((result,throwable) ->
                    System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成")
                ).join();
    }

    public static void main(String[] args) {
        new ChainDataLoader().load();
    }
}

运行结果

线程: ForkJoinPool.commonPool-worker-9 loadConfigurations() 耗时: 1004 毫秒

线程: ForkJoinPool.commonPool-worker-9 loadUsers 耗时: 2004 毫秒

线程: ForkJoinPool.commonPool-worker-9 loadOrders() 耗时: 3001 毫秒

线程 :ForkJoinPool.commonPool-worker-9 加载完成

load()总耗时:6079毫秒

由结果可知,当异步执行时,它并不是由3个线程去执行,而是由同一个线程进行链式执行的,之所以加入join,是为了让主线程等待返回。它跟第一个DataLoader的不同在于,DataLoader是全部由主线程去阻塞执行的,而这里如果不使用join()则肯定为非阻塞的,只不过join()会阻塞,这个是线程相关的常识,具体可以参考线程,JVM锁整理 。也就是说,如果去掉join(),由于CompletableFuture都是守护线程,主线程执行完,它是不会执行的,现在我们把代码稍作修改如下。

代码语言:javascript
复制
public class ChainDataLoader extends DataLoader {
    protected void doLoad() {
        CompletableFuture
                .runAsync(super::loadConfigurations)
                .thenRun(super::loadUsers)
                .thenRun(super::loadOrders)
                .whenComplete((result,throwable) ->
                    System.out.println("[线程 :" + Thread.currentThread().getName() + "] 加载完成")
                );
        System.out.println("[线程 :" + Thread.currentThread().getName() +"】后续执行");
    }

    public static void main(String[] args) {
        new ChainDataLoader().load();
    }
}

运行结果

[线程 :main】后续执行

load()总耗时:60毫秒

证明CompletableFuture还未启动,并未执行。但如果我们把new ChainDataLoader().load();这段代码放入Controller中

代码语言:javascript
复制
@RestController
public class TestController {
    @GetMapping("/future")
    public void findfuture() {
        new ChainDataLoader().load();
    }
}

通过浏览器访问

可以看到后台打印

[线程 :reactor-http-nio-2】后续执行

load()总耗时:3毫秒

线程: ForkJoinPool.commonPool-worker-9 loadConfigurations() 耗时: 1003 毫秒

线程: ForkJoinPool.commonPool-worker-9 loadUsers 耗时: 2004 毫秒

线程: ForkJoinPool.commonPool-worker-9 loadOrders() 耗时: 3005 毫秒

线程 :ForkJoinPool.commonPool-worker-9 加载完成

证明异步线程是非阻塞并且执行了的。

这里我们可以看到CompletableFuture属于异步操作,如果强制等待结束的话,又回到了阻塞编程的方式,并且让我们明白到非阻塞不一定提升性能,因为即便是非阻塞,在异步线程中,它一样要使用6秒才能完成,相比于ParalleDataLoader的并行执行,只需要3秒完成来说,非阻塞的好处是让主方法线程及时完成,让主方法线程池可以及时释放。不过同理,在ParalleDataLoader中如果不进行completionService.poll()的阻塞操作,主线程同样会率先返回,由于线程池中的线程并非守护线程,它在主线程完成后会继续执行。

代码语言:javascript
复制
public class ParalleDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService completionService = new ExecutorCompletionService(executorService);
        completionService.submit(super::loadConfigurations,null);
        completionService.submit(super::loadUsers,null);
        completionService.submit(super::loadOrders,null);

//        int count = 0;
//        while (count < 3) {
//            if (completionService.poll() != null) {
//                count++;
//            }
//        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
        new ParalleDataLoader().load();
    }
}

运行结果

load()总耗时:59毫秒

线程: pool-1-thread-1 loadConfigurations() 耗时: 1004 毫秒

线程: pool-1-thread-2 loadUsers 耗时: 2004 毫秒

线程: pool-1-thread-3 loadOrders() 耗时: 3002 毫秒

同样我们把new ParalleDataLoader().load()放入Controller中

代码语言:javascript
复制
@RestController
public class TestController {
    @GetMapping("/future")
    public void findfuture() {
        new ParalleDataLoader().load();
    }
}

通过浏览器访问,后台打印

load()总耗时:1毫秒

线程: pool-1-thread-1 loadConfigurations() 耗时: 1000 毫秒

线程: pool-1-thread-2 loadUsers 耗时: 2004 毫秒

线程: pool-1-thread-3 loadOrders() 耗时: 3005 毫秒

这里同样为异步非阻塞,并且并发了3个线程,异步线程总耗时是3秒。但是这样会造成异步线程池的线程数并发量比较大。

Reactive Stream JVM认为异步系统和资源消费需要特殊处理,在Reactor的github的官网上,有这样一段描述https://github.com/reactive-streams/reactive-streams-jvm

  • 流式数据容量难以预判
  • 异步编程复杂
  • 数据源和消费端之间资源消费难以平衡

Reactive的理解,需要从很多方便

  • 维基百科
  • The Reactive Mainifesto : Resactive组织
  • Spring Framework
  • ReactiveX : RxJava
  • Reactor : WebFlux底层
  • @andrestaltz :著名作者

Reactive Programming定义

The Reactive Mainifesto认为:官网https://www.reactivemanifesto.org/

  • 响应性 (Responsive)
  • 适应性强的 (Resilient)
  • 弹性的 (Elastic)
  • 消息驱动的 (Message Driven)

侧重点

  • 面向Reactive系统
  • Reactive系统原则
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档