首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spring Boot WebFlux框架响应式编程数据实时更新

有这样一个需求,用户看到的数据需要动态变化,每隔10秒钟展示最新的动态,请问这个需求如何实现?

有朋友说这个简单,在数据页写一个定时任务,每10秒钟轮询一次获取最新的数据。

这是一种解决办法。不在数据展示页轮询行不行?还有没有其他办法?

答案是有的,Spring就提供了这样一个框架,Spring WebFlux框架。

一、Spring WebFlux简单介绍

Spring WebFlux 是一个用于构建响应式、非阻塞、事件驱动的 Web 应用程序的框架。它是 Spring Framework 5.0 引入的一个新的模块,旨在支持响应式编程模型。

主要特点:

1、响应式编程模型

Spring WebFlux 基于 Reactor 库实现了响应式编程模型。它通过利用 Java 8 中引入的流(Stream)和 CompletableFuture API,使得应用程序能够以声明式的方式处理异步数据流,从而提高了并发处理能力和性能。

2、非阻塞 I/O

使用了基于事件驱动的非阻塞 I/O 模型,可以有效地利用有限的线程资源来处理大量并发连接。这种方式避免了传统阻塞 I/O 中每个连接都需要一个线程的问题,提高了系统的扩展性和资源利用率。

3、多种响应式架构支持

Spring WebFlux 支持多种响应式架构,包括基于 Servlet 容器的传统应用程序和使用 Netty、Undertow 等非阻塞服务器的响应式应用程序。这种灵活性使得开发人员可以根据应用的具体需求选择合适的部署方式。

4、函数式端点和路由

Spring WebFlux 提供了函数式端点和路由定义的方式,通过编程的方式定义路由和处理器函数,而不是依赖注解驱动的控制器。这种方式简化了代码结构,使得处理器函数可以更加灵活和复用。

5、与 RSocket 的集成

Spring WebFlux 与 RSocket 协议天然集成,RSocket 是一种基于 Reactive Streams 的网络协议,用于构建响应式和实时的应用程序通信。这使得 Spring WebFlux 能够轻松地构建支持双向通信的应用程序,例如实时数据处理、推送通知等。

6、全面的 Spring 生态系统支持

Spring WebFlux 可以无缝集成 Spring 生态系统中的其他模块和库,例如 Spring Data、Spring Security 等,从而使得开发人员能够快速构建完整功能的响应式应用程序。

优点很多,仅从概念上理解太抽象,不如做个示例感觉一下。

二、一个简易的实时数据更新示例

1、添加依赖

SpringBoot整合了webflux框架,在pom文件中引入这个架包即可。

<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId>

2、创建响应式控制器

@RestControllerpublic class ReactiveController { /** * 固定返回一次,ajax接收即可 * @return */ @GetMapping("/hello") public Mono<String> sayHello() { return Mono.just("Hello, Reactive World!"); }

/** * 每1秒钟响应客户端一次 * @return */ @GetMapping(value = "/realtime", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> realtimeData() { return Flux.interval(Duration.ofSeconds(1)) .map(sequence -> "当前时间:" + LocalTime.now()) // 每秒发送一个当前时间的字符串 .doOnNext(num -> System.out.println(num)); // 后续处理 } //报头设置为 "text/event-stream",以便于发送事件流 @GetMapping(value="/realtime1",produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody public Flux<ServerSentEvent<Object>> countDown() { //每一秒钟推送一次 return Flux.interval(Duration.ofSeconds(1)) .map(seq -> Tuples.of(seq, LocalTime.now())) .map(data -> ServerSentEvent.<Object>builder() .event("realtime1") //和前端addEventListener监听的事件一一对应 .id(Long.toString(data.getT1())) //为每次发送设置一个id .data(data.getT2().toString()) .build()); } @GetMapping(value = "/data-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody public Flux<Map<String,String>> streamData() { return Flux.interval(Duration.ofSeconds(1)) // 每1秒钟发出一个递增的值 .map(sequence -> { Map<String,String> map = new HashMap<>(); map.put("aa", "bb" + sequence); return map; }); }}

3、页面数据接收

html文件放在 src\main\resources\static 目录下,这是默认的页面目录。

<meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>EventSource Example</title> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> <div id="hello"></div> <div id="data"></div> <div id="data1"></div> <div id="data2"></div> <script> // 针对Mono使用普通的ajax接收即可 $.ajax({ url: '/hello', type: 'GET', success: function(data) { $('#hello').html(data); } }); // 判断浏览器是否支持EventSource if (typeof (EventSource) !== "undefined") { // 针对Flux则需要使用EventSource来 const eventSource = new EventSource('/realtime'); // 接收来自realtime的消息 eventSource.onmessage = function(event) { console.log(event); // 更新页面上的数据 document.getElementById('data').innerText = event.data; }; const eventSource1 = new EventSource('/realtime1'); // 添加realtime1,标准的事件监听器,允许为特定的事件类型添加多个监听器,需要配合服务端使用 eventSource1.addEventListener("realtime1", function(e) { document.getElementById("data1").innerHTML = e.data; }, false);//使用false表示在冒泡阶段处理事件,而不是捕获阶段。 const eventSource2 = new EventSource('/data-stream'); // 监听来自message的消息,等同于onmessage eventSource2.addEventListener("message",function(event) { // 转换成json数据 let data = JSON.parse(event.data); // 更新页面上的数据 document.getElementById('data2').innerText = data["aa"]; }); }else{ //注意:ie浏览器不支持 document.getElementById("data").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件..."; var xhr; if (window.XMLHttpRequest){ //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法 xhr=new XMLHttpRequest(); }else{ //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替 xhr=new ActiveXObject("Microsoft.XMLHTTP"); } console.log(xhr); xhr.open('GET', '/realtime'); xhr.send(null);//发送请求 xhr.onreadystatechange = function() { console.log("s响应状态:" + xhr.readyState); //2是空响应,3是响应一部分,4是响应完成 if (xhr.readyState > 2) { //这儿可以使用response(对应json)与responseText(对应text) var newData = xhr.response.substr(xhr.seenBytes); newData = newData.replace(/\n/g, "#"); newData = newData.substring(0, newData.length - 1); var data = newData.split("#"); console.log("获取到的数据:" + data); document.getElementById("data").innerHTML = data; //长度重新赋值,下次截取时需要使用 xhr.seenBytes = xhr.response.length; } } }

EventSource 是 HTML5 的一部分,它是一种用于接收服务器发送的事件流的 API。无需引入额外的 JavaScript 文件,只需在 HTML 页面中使用它即可。

EventSource 对象有两种方式来处理从服务器发送的事件:addEventListener 和 onmessage。addEventListener 可以监听多个事件、多次监听,onmessage只能接收来自服务端的数据流。

EventSource API 用于接收服务器发送事件的机制,并且仅支持使用 HTTP GET 方法来建立连接。因此,无法通过 EventSource 来设置其他请求方法,比如 POST 或 PUT。EventSource 是单向通信的,客户端只能接收服务器发送的事件,而不能向服务器发送数据。

如果浏览器不支持 EventSource ,我们就需要使用 XMLHttpRequest 或 ActiveXObject 来代替它。XMLHttpRequest 支持各种请求方式,这一点比 EventSource 友好。

4、测试

启动项目,在浏览器输入实时数据的获取接口,查看输出情况:

通过运行结果,我们发现,这个浏览器一直在转圈圈,每隔10秒钟,都有新的数据从服务器传递过来。

通过浏览器定位到页面,来看看实时数据的更新情况:

服务器每隔10秒响应一次客户端,不必客户端重复发起请求,这对客户端来说无疑节约了大量的请求资源。在浏览器关闭后,服务器端会自动停止数据的响应。

三、Mono 和 Flux

在上面示例中,除了Flux类,还使用了Mono,它俩有什么区别呢?

Mono 和 Flux 是 Spring WebFlux 框架中用于处理响应式流的两个关键类,它们之间的区别和联系如下:

Mono

Mono 代表一个包含零个或一个元素的异步序列。

它类似于 Java 8 中的 CompletableFuture,但具有更丰富的操作符。

通常用于表示只产生一个值或者需要处理单个结果的场景。

例如,当您需要从数据库中查询一个对象时,您可以使用 Mono 来表示查询的结果。在前端使用ajax接收即可。

Flux

Flux 代表一个包含零个或多个元素的异步序列。

它类似于 Java 8 中的 Stream,但是可以用于异步场景,并提供了丰富的操作符来处理异步数据流。

通常用于表示可能产生多个值或需要处理多个结果的场景。

例如,当我们需要从消息队列中订阅消息时,可以使用 Flux 来表示消息流。在前端需要使用html5自带的EventSource来接收。

Mono 和 Flux 都是 Reactor 库中的 CorePublisher<T> 接口的子类,用于支持响应式编程模型。

它们都提供了丰富的操作符,如 map、filter、flatMap 等,用于对数据流进行转换、过滤和组合。

在 Spring WebFlux 应用程序中,通常可以在控制器方法中返回 Mono 或 Flux 对象来表示异步处理结果或数据流。

它们都是惰性求值的,只有在订阅时才会触发数据流的执行。

因此,Mono 适用于表示单个元素的异步序列,而 Flux 适用于表示多个元素的异步序列。我们可以根据需要选择使用其中之一或两者结合使用来处理响应式数据流。

四、应用场景

Web开发:在Web开发中,响应式编程模型可以用于构建反应式堆栈(Reactive Stack)应用程序,处理大量并发请求和异步事件。

实时数据处理:对于需要实时处理数据流的应用,例如实时分析、实时监控等,响应式编程模型能够有效地处理数据流,并且具有良好的可伸缩性。

用户界面开发:在构建现代用户界面时,响应式编程模型可以用于处理用户输入、动画效果以及与后端服务的异步通信。

大数据处理:对于大规模数据处理和分析的场景,响应式编程模型可以处理复杂的数据流,并且能够轻松应对数据的变化和处理需求。

总的来说,响应式编程模型适用于需要处理异步事件、数据流和实时性要求较高的应用场景。

好不好用,尝试一下才知道。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/O5_IHERnwOnNDibtZIedAUbQ0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券