前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Boot 系列 —— Spring Webflux

Spring Boot 系列 —— Spring Webflux

作者头像
求和小熊猫
发布2022-06-30 15:34:47
1.3K0
发布2022-06-30 15:34:47
举报

Spring Webflux

文章目录

Java 函数式编程

FunctionalInterface 注解

Java 8提出了函数式接口的概念。所谓函数式接口,简单来说,就是只定义了单一抽象方法的接口。 【示例】

@FunctionalInterface
public interface IntHandler {
    int handler(int a);
}

注释FunctionalInterface用于表明IntHandler接口是一个函数式接口,该接口被定义为只包含一个抽象方法handle(),因此它符合函数式接口的定义。如果一个函数满足函数式接口的定义,那么即使不标注为@FunctionalInterface,编译器依然会把它看做函数式接口。这有点像@Override注释,如果你的函数符合重载的要求,无论你是否标注了@Override,编译器都会识别这个重载函数,但一旦你进行了标注,而实际的代码不符合规范,那么就会得到一个编译错误。

这里需要强调的是,函数式接口只能有一个抽象方法,而不是只能有一个方法。

Functional 接口

Functional 类型的接口都在 java.util.function 包下。其中的接口可以大致分为如下几类。

  • Consumer(消费):接受参数,无返回值
  • Function(函数):接受参数,有返回值
  • Operator(操作):接受参数,返回与参数同类型的值
  • Predicate(断言):接受参数,返回boolean类型
  • Supplier(供应):无参数,有返回值

具体接口如下表:

接口名称

描述

Consumer

接受一个参数,无返回值

BiConsumer

接受两个参数,无返回值

DoubleConsumer

接受一个double类型的参数,无返回值

IntConsumer

接受一个int类型的参数,无返回值

LongConsumer

接受一个long类型的参数,无返回值

ObjDoubleConsumer

接受一个自定义类型的参数和一个double类型的参数,无返回值

ObjIntConsumer

接受一个自定义类型的参数和一个int类型的参数,无返回值

ObjLongConsumer

接受一个自定义类型的参数和一个long类型的参数,无返回值

Function

接受一个参数,有返回值

BiFunction

接受两个参数,有返回值

DoubleFunction

接受一个double类型的参数,有返回值

IntFunction

接受一个int类型的参数,有返回值

LongFunction

接受一个long类型的参数,有返回值

IntToDoubleFunction

接受一个int类型的参数,返回一个double类型的值

IntToLongFunction

接受一个int类型的参数,返回一个long类型的值

LongToDoubleFunction

接受一个long类型的参数,返回一个double类型的值

LongToIntFunction

接受一个long类型的参数,返回一个int类型的值

DoubleToIntFunction

接受一个double类型的参数,返回一个int类型的值

DoubleToLongFunction

接受一个double类型的参数,返回一个long类型的值

ToDoubleBiFunction

接受两个参数,返回一个double类型的值

ToDoubleFunction

接受一个参数,返回一个double类型的值

ToIntBiFunction

接受两个参数,返回一个int类型的值

ToIntFunction

接受一个参数,返回一个int类型的值

ToLongBiFunction

接受两个参数,返回一个long类型的值

ToLongFunction

接受一个参数,返回一个long类型的值

BinaryOperator

接受两个相同类型的参数,返回一个相同类型的值

DoubleBinaryOperator

接受两个double类型的参数,返回一个double类型的值

DoubleUnaryOperator

接受一个double类型的参数,返回一个double类型的值

IntBinaryOperator

接受两个int类型的参数,返回一个int类型的值

IntUnaryOperator

接受一个int类型的参数,返回一个int类型的值

LongBinaryOperator

接受两个long类型的参数,返回一个long类型的值

LongUnaryOperator

接受一个long类型的参数,返回一个long类型的值

UnaryOperator

接受一个参数,返回一个相同类型的值

Predicate

接受一个参数,返回一个boolean类型的值

BiPredicate

接受两个参数,返回一个boolean类型的值

DoublePredicate

接受一个double类型的参数,返回一个boolean类型的值

IntPredicate

接受一个int类型的参数,返回一个boolean类型的值

LongPredicate

接受一个long类型的参数,返回一个boolean类型的值

Supplier

无参数,有返回值

BooleanSupplier

无参数,返回一个boolean类型的值

DoubleSupplier

无参数,返回一个double类型的值

IntSupplier

无参数,返回一个int类型的值

LongSupplier

无参数,返回一个long类型的值

【示例】

BiFunction<Long,Object,String> function = (price,object) ->{
    StringBuilder builder = new StringBuilder();
    builder.append("price: ").append(price)
            .append("\n Object:").append(object);
    return builder.toString();
}; //  定义 Function
System.out.println(function.apply(15L,"apple")); // 直接使用 Function 对象

Supplier<Integer> integerSupplier = () -> {return new Random().nextInt();}; // 提供一个 Integer 类型的对象

Consumer<String> stringConsumer = (a) -> {System.out.println(a);}; // 消费 String 对象,直接答应

Java 响应式编程 Reactor3

Reactor3 介绍

Reactor是JVM的完全无阻塞的反应式编程基础,具有有效的需求管理(以管理“背压”的形式)。它直接与Java 8函数式API集成,特别是CompletableFuture,Stream和Sustens。它提供可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并广泛实现了反应式流规范。

Reactor还支持与Reactor-netty项目的无阻塞过程间通信。Reactor Netty适用于微服务架构,为HTTP(包括Websockets),TCP和UDP提供背压就绪网络引擎。完全支持反应式编码和解码。

响应式编程

Reactor 是响应式编程范式的实现,总结起来有如下几点:

响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。

响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。

使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。

Reactor3 的使用

Reactor 3 的具体使用可以见如下文档https://htmlpreview.github.io/?https://github.com/get-set/reactor-core/blob/master-zh/src/docs/index.html#producing

Flux 和 Mono 的详述

Flux

flux.png
flux.png

Flux 是一个能够发出 0 到 N 个元素的标准的 Publisher,它会被一个“错误(error)” 或“完成(completion)”信号终止。因此,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNextonCompleteonError 方法。

由于多种不同的信号可能性,Flux 可以作为一种通用的响应式类型。注意,所有的信号事件, 包括代表终止的信号事件都是可选的:如果没有 onNext 事件但是有一个 onComplete 事件, 那么发出的就是 空的 有限序列,但是去掉 onComplete 那么得到的就是一个 无限的 空序列。 当然,无限序列也可以不是空序列,比如,Flux.interval(Duration) 生成的是一个 Flux, 这就是一个无限地周期性发出规律 tick 的时钟序列。

Mono

png
png

Mono 是一种特殊的 Publisher, 它最多发出一个元素,然后终止于一个 onComplete 信号或一个 onError 信号。

它只适用其中一部分可用于 Flux 的操作。比如,(两个 Mono 的)结合类操作可以忽略其中之一 而发出另一个 Mono,也可以将两个都发出,对于后一种情况会切换为一个 Flux。

例如,Mono#concatWith(Publisher) 返回一个 Flux,而 Mono#then(Mono) 返回另一个 Mono。

注意,Mono 可以用于表示“空”的只有完成概念的异步处理(比如 Runnable)。这种用 Mono 来创建。

Flux 和 Mono 的创建
通用创建方式
Mono<Integer> integerMono = Mono.just(new Random().nextInt()); // 创建个 Integer 的 Mono
Mono<String> stringMono = Mono.empty(); // 创建一个空的 Mono
Mono<Double> doubleMono = Mono.fromSupplier(new Supplier<Double>() {
    @Override
    public Double get() {
        return 15.7;
    }
}); // Mono 通过 Supplier 提供值

Flux<String> stringFlux = Flux.just("name","age","test","demo"); // 创建带 4 个 String 的 Flux
Flux<Integer> integerFlux = Flux.range(3,4); // 创建一个能提供 3,4,5,6 四个 Integer 类型的数字
List<Double> doubles = Arrays.asList(14.2,12.7,9.00,7.92,7.62,6.8,5.56,5.45);
Flux<Double> doubleFlux = Flux.fromIterable(doubles); // 从可迭代容器中创建 Flux
Flux<Double> doubleFlux1 = Flux.fromStream(doubles.stream()); // 通过流创建 flux
Flux<Double> doubleFlux2 = Flux.fromStream(new Supplier<Stream<? extends Double>>() {
    @Override
    public Stream<? extends Double> get() {
        return doubles.stream();
    }
});// 通过 Supllier 提供流
可编程式的创建

在这一小节,我们介绍如何通过定义相对应的事件(onNext、onErroronComplete) 创建一个 Flux 或 Mono。所有这些方法都通过 API 来触发我们叫做 sink(池) 的事件。

Generate 方法

这是一种 同步地, 逐个地 产生值的方法,意味着 sink 是一个 SynchronousSink 而且其 next() 方法在每次回调的时候最多只能被调用一次。你也可以调用 error(Throwable) 或者 complete(),不过是可选的。

最有用的一种方式就是同时能够记录一个状态值(state),从而在使用 sink 发出下一个元素的时候能够 基于这个状态值去产生元素。此时生成器(generator)方法就是一个 BiFunction<S, SynchronousSink<T>, S>, 其中 S 是状态对象的类型。你需要提供一个 Supplier<S> 来初始化状态值,而生成器需要 在每一“回合”生成元素后返回新的状态值(供下一回合使用)。

【示例】

Flux<String> flux = Flux.generate(
    () -> 0,  // 一个 suplier 用于提供初始值
    (state, sink) -> { // 为 BiFunction<S, SynchronousSink<T>, S> 接口的实现
      sink.next("3 x " + state + " = " + 3*state); // sink 对象向 flux 返回一个值
      if (state == 10) sink.complete();  // sink 结束,不再向 flux 提供值
      return state + 1; // 返回一个新的 state ,用于下一次调用
    });
Create 方法

作为一个更高级的创建 Flux 的方式, create 方法的生成方式既可以是同步, 也可以是异步的,并且还可以每次发出多个元素。

该方法用到了 FluxSink,后者同样提供 next,error 和 complete 等方法。 与 generate 不同的是,create 不需要状态值,另一方面,它可以在回调中触发 多个事件(即使是在未来的某个时间)。

Handle 方法

handle 方法有些不同,它在 Mono 和 Flux 中都有。然而,它是一个实例方法 (instance method),意思就是它要链接在一个现有的源后使用(与其他操作符一样)。

它与 generate 比较类似,因为它也使用 SynchronousSink,并且只允许元素逐个发出。 然而,handle 可被用于基于现有数据源中的元素生成任意值,有可能还会跳过一些元素。 这样,可以把它当做 map 与 filter 的组合。handle 方法签名如下:

handle(BiConsumer<T, SynchronousSink<R>>)

Flux 和 Mono 信息的消费和处理

Flux 和 Mono 的消息基本依靠 subscribe() 方法进行处理

subscribe(); // 订阅并触发序列。

subscribe(Consumer<? super T> consumer); // 对每一个生成的元素进行消费。

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); // 对正常元素进行消费,也对错误进行响应。
          
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); // 对正常元素和错误均有响应,还定义了序列正常完成后的回调。
          
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); //  对正常元素、错误和完成信号均有响应, 同时也定义了对该 subscribe 方法返回的 Subscription 执行的回调。
对 Flux 和 Mono 中的信息进行处理

对 Flux 和 Mono 可以使用 map() 函数进行处理

Flux<Integer> ints = Flux.range(1, 4)
      .map(i -> {  // 对 flux 中的四个值进行处理
        if (i <= 3) return i;  // 小于3进行返回
        throw new RuntimeException("Got to 4"); // 大于3抛出异常
      });
ints.subscribe(i -> System.out.println(i), // 对正常值进行处理的 consumer
      error -> System.err.println("Error: " + error)); // 对异常进行处理的 consumer

Spring webflux 的使用

Spring Webflux 的区别其实与 Spring Serverlet 的差别并不大,只是需要注意返回对象变成了 Mono 或者是 Flux 对象。

引入 POM

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

编写配置文件

application.yml

server:  
    port: 8089

编写主函数

@EnableWebFlux
@SpringBootApplication
public class SpringWebfluxTestApplication {

    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringWebfluxTestApplication.class);
        application.setWebApplicationType(WebApplicationType.REACTIVE);
        application.run(args);
    }
}

编写 Controller

@RestController
public class WebfluxController {
    @GetMapping("/test/1")
    public Mono<String> test1(){
        return Mono.just("result");
    }
}

测试效果

直接在浏览器中访问 http://localhost:8089/test/1

可以看见正常返回了一个 result

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spring Webflux
    • 文章目录
      • Java 函数式编程
        • FunctionalInterface 注解
        • Functional 接口
      • Java 响应式编程 Reactor3
        • Reactor3 介绍
        • 响应式编程
        • Reactor3 的使用
      • Spring webflux 的使用
        • 引入 POM
        • 编写配置文件
        • 编写主函数
        • 编写 Controller
        • 测试效果
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档