专栏首页IT进修之路WebFlux定点推送、全推送灵活websocket运用

WebFlux定点推送、全推送灵活websocket运用

前言

        WebFlux 本身提供了对 WebSocket 协议的支持,处理 WebSocket 请求需要对应的 handler 实现 WebSocketHandler 接口,每一个 WebSocket 都有一个关联的 WebSocketSession,包含了建立请求时的握手信息 HandshakeInfo,以及其它相关的信息。可以通过 session 的 receive() 方法来接收客户端的数据,通过 session 的 send() 方法向客户端发送数据。

示例

下面是一个简单的 WebSocketHandler 示例:

@Component
public class EchoHandler implements WebSocketHandler {
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(
                session.receive().map(
                        msg -> session.textMessage("ECHO -> " + msg.getPayloadAsText())));
    }
}

        有了 handler 之后,还需要让 WebFlux 知道哪些请求需要交给这个 handler 进行处理,因此要创建相应的 HandlerMapping。

        在处理 HTTP 请求时,我们经常使用 WebFlux 中最简单的 handler 定义方式,即通过注解 @RequestMapping 将某个方法定义为处理特定路径请求的 handler。 但是这个注解是用于处理 HTTP 请求的,对于 WebSocket 请求而言,收到请求后还需要协议升级的过程,之后才是 handler 的执行,所以我们不能直接通过该注解定义请求映射,不过可以使用 SimpleUrlHandlerMapping 来添加映射。

@Configuration
public class WebSocketConfiguration {
    @Bean
    public HandlerMapping webSocketMapping(EchoHandler echoHandler) {
        final Map<String, WebSocketHandler> map = new HashMap<>(1);
        map.put("/echo", echoHandler);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

这样就能够将发往 /echo 的 WebSocket 请求交给 EchoHandler 处理。

我们还要为 WebSocket 类型的 handler 创建对应的 WebSocketHandlerAdapter,以便让 DispatcherHandler 能够调用我们的 WebSocketHandler。

完成这三个步骤后,当一个 WebSocket 请求到达 WebFlux 时,首先由 DispatcherHandler 进行处理,它会根据已有的 HandlerMapping 找到这个 WebSocket 请求对应的 handler,接着发现该 handler 实现了 WebSocketHandler 接口,于是会通过 WebSocketHandlerAdapter 来完成该 handler 的调用。

疑惑

        从上面的例子不难看出,没接收一个请求后,就得在里面里面返回消息,后面就不能再给他发消息了。其次是我每次新添加或者删除一个消息的处理类Handler,就得每次去修改配置文件中的SimpleUrlHandlerMapping的UrlMap的内容,感觉不是很友好。于是针对这2点进行修改和调整如下:

 1. 用自定义注解注册 Handler

我们能否像注册 HTTP 请求的 Handler 那样,也通过类似 RequestMapping 的注解来注册 Handler 呢?

虽然官方没有相关实现,但我们可以自己实现一个类似的注解,不妨叫作 WebSocketMapping

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface WebSocketMapping {
    String value() default "";
}

@Retention(RetentionPolicy.RUNTIME) 表明该注解工作在运行期间,@Target(ElementType.TYPE) 表明该注解作用在类上。

我们先看下该注解最终的使用方式。下面是一个 TimeHandler 的示例,它会每秒钟会向客户端发送一次时间。我们通过注解 @WebSocketMapping("/time") 完成了 TimeHandler 的注册,告诉 WebFlux 当有 WebSocket 请求发往 /echo 路径时,就交给 EchoHandler 处理:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(final WebSocketSession session) {
        return session.send(
                session.receive()
                        .map(msg -> session.textMessage(
                                "服务端返回:小明, -> " + msg.getPayloadAsText())));
    }
}

是不是和 RequestMapping 一样方便?

到目前为止,这个注解还没有实际的功能,还不能自动注册 handler。回顾我们上面注册路由的方式,我们创建了一个 SimpleUrlHandlerMapping,并手动添加了 EchoHandler 的映射规则,然后将其作为 HandlerMapping 的 Bean 返回。

现在我们要创建一个专门的 HandlerMapping 类来处理 WebSocketMapping 注解,自动完成 handler 的注册:

public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping{
	
	private Map<String, WebSocketHandler> handlerMap = new LinkedHashMap<>();
	/**
     * Register WebSocket handlers annotated by @WebSocketMapping
     * @throws BeansException
     */
    @Override
    public void initApplicationContext() throws BeansException {
        Map<String, Object> beanMap = obtainApplicationContext()
                .getBeansWithAnnotation(WebSocketMapping.class);
        beanMap.values().forEach(bean -> {
            if (!(bean instanceof WebSocketHandler)) {
                throw new RuntimeException(
                        String.format("Controller [%s] doesn't implement WebSocketHandler interface.",
                                bean.getClass().getName()));
            }
            WebSocketMapping annotation = AnnotationUtils.getAnnotation(
                    bean.getClass(), WebSocketMapping.class);
            //webSocketMapping 映射到管理中
            handlerMap.put(Objects.requireNonNull(annotation).value(),(WebSocketHandler) bean);
        });
        super.setOrder(Ordered.HIGHEST_PRECEDENCE);
        super.setUrlMap(handlerMap);
        super.initApplicationContext();
    }
}

我们的 WebSocketMappingHandlerMapping 类,实际上就是 SimpleUrlHandlerMapping,只不过增加了一些初始化的操作。

initApplicationContext() 方法是 Spring 中 ApplicationObjectSupport 类的方法,用于自定义类的初始化行为,在我们的 WebSocketMappingHandlerMapping 中,初始化工作主要是收集使用了 @WebSocketMapping 注解并且实现来 WebSocketHandler 接口的 Component,然后将它们注册到内部的 SimpleUrlHandlerMapping 中。之后的路由工作都是由父类 SimpleUrlHandlerMapping 已实现的功能来完成。

现在,我们只需要返回 WebSocketMappingHandlerMapping 的 Bean,就能自动处理 @WebSocketMapping 注解了:

@Configuration
public class WebSocketConfiguration {

	@Bean
	public HandlerMapping webSocketMapping() {
		return new WebSocketMappingHandlerMapping();
	}

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}

2. WebSocket 请求处理过程剖析

我们来看下基于 Reactor Netty 的 WebFlux 具体是如何处理 WebSocket 请求的。

前面说过,WebSocket 请求进入 WebFlux 后,首先会从 HandlerMapping 中找到对应的 WebSocketHandler,再由 WebSocketHandlerAdapter 进行实际的调用。这就不再多做阐述,有兴趣的朋友可以去看看WebSocketHandler,WebSocketHandlerAdapter。

3. 分离数据的接收与发送操作

我们知道 HTTP 协议是半双工通信,虽然客户端和服务器都能给对方发数据,但是同一时间内只会由一方向另一方发送数据,并且在顺序上是客户端先发送请求,然后才由服务器返回响应数据。所以服务器处理 HTTP 的逻辑很简单,就是每接收到一个客户端请求,就返回一个响应。

而 WebSocket 是全双工通信,客户端和服务器可以随时向另一方发送数据,所以不再是"发送请求、返回响应"的通信方式了。我们上面的 EchoHandler 示例用的仍旧是这一方式,即收到数据后再针对性地返回一条数据,我们下面就来看看如何充分利用 WebSocket 的双向通信。

WebSocket 的处理,主要是通过 session 完成对两个数据流的操作,一个是客户端发给服务器的数据流,一个是服务器发给客户端的数据流:

WebSocketSession 方法

描述

Flux<WebSocketMessage> receive()

接收来自客户端的数据流,当连接关闭时数据流结束。

Mono<Void> send(Publisher<WebSocketMessage>)

向客户端发送数据流,当数据流结束时,往客户端的写操作也会随之结束,此时返回的 Mono<Void> 会发出一个完成信号。

在 WebSocketHandler 中,最后应该将两个数据流的处理结果整合成一个信号流,并返回一个 Mono<Void> 用于表明处理是否结束。

我们分别为两个流定义处理的逻辑:

  • 对于输出流:服务器每秒向客户端发送一个数字;
  • 对于输入流:每当收到客户端消息时,就打印到标准输出
Mono<Void> input = session.receive()
                   .map(WebSocketMessage::getPayloadAsText)
                   .map(msg -> id + ": " + msg)
				   .doOnNext(System.out::println).then();

Mono<Void> output = session.send(Flux.create(sink -> 
                    senderMap.put(id, new WebSocketSender(session, sink))));

 这两个处理逻辑互相独立,它们之间没有先后关系,操作执行完之后都是返回一个 Mono<Void>,但是如何将这两个操作的结果整合成一个信号流返回给 WebFlux 呢?我们可以使用 WebFlux 中的 Mono.zip() 方法:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {

	@Autowired
	private ConcurrentHashMap<String, WebSocketSender> senderMap;

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()
                .map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg)
				.doOnNext(System.out::println).then();

		Mono<Void> output = session.send(Flux.create(sink -> 
                senderMap.put(id, new WebSocketSender(session, sink))));
		/**
		 * Mono.zip() 会将多个 Mono 合并为一个新的 Mono,
         * 任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono
		 * 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。
		 */
		return Mono.zip(input, output).then();
	}
}

4. 从 Handler 外部发送数据

这里所说的从外部发送数据,指的是需要在 WebSocketHandler 的代码范围之外,在其它地方通过代码调用的方式向 WebSocket 连接发送数据。

思路:在定义 session 的 send() 操作时,通过编程的方式创建 Flux,即使用 Flux.create() 方法创建,将发布 Flux 数据的 FluxSink 暴露出来,并进行保存,然后在需要发送数据的地方,调用 FluxSink<T> 的 next(T data) 方法,向 Flux 的订阅者发布数据。

create 方法是以编程方式创建 Flux 的高级形式,它允许每次产生多个数据,并且可以由多个线程产生。 create 方法将内部的 FluxSink 暴露出来,FluxSink 提供了 next、error、complete 方法。通过 create 方法,可以将响应式堆栈中的 API 与其它 API 进行连接。

考虑这么一个场景:服务器与客户端 A 建立 WebSocket 连接后,允许客户端 B 通过 HTTP 向客户端 A 发送数据。

不考虑安全性、鲁棒性等问题,我们给出一个简单的示例。

首先是 WebSocketHandler 的实现,客户端发送 WebSocket 建立请求时,需要在 query 参数中为当前连接指定一个 id,服务器会以该 id 为键,以对应的 WebSocketSender 为值存放到 senderMap 中:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {

	@Autowired
	private ConcurrentHashMap<String, WebSocketSender> senderMap;

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// TODO Auto-generated method stub
		HandshakeInfo handshakeInfo = session.getHandshakeInfo();
		Map<String, String> queryMap = getQueryMap(handshakeInfo.getUri().getQuery());
		String id = queryMap.getOrDefault("id", "defaultId");
		Mono<Void> input = session.receive().map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg)
				.doOnNext(System.out::println).then();

		Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink))));
		/**
		 * Mono.zip() 会将多个 Mono 合并为一个新的 Mono,任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono
		 * 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。
		 */
		return Mono.zip(input, output).then();
	}

	//用于获取url参数
	 private Map<String, String> getQueryMap(String queryStr) {
        Map<String, String> queryMap = new HashMap<>();
        if (!StringUtils.isEmpty(queryStr)) {
            String[] queryParam = queryStr.split("&");
            Arrays.stream(queryParam).forEach(s -> {
                String[] kv = s.split("=", 2);
                String value = kv.length == 2 ? kv[1] : "";
                queryMap.put(kv[0], value);
            });
        }
        return queryMap;
    }
}

其中,senderMap 是我们自己定义的 Bean,在配置文件中定义:

@Configuration
public class WebSocketConfiguration {

	@Bean
	public HandlerMapping webSocketMapping() {
		return new WebSocketMappingHandlerMapping();
	}

	@Bean
	public ConcurrentHashMap<String, WebSocketSender> senderMap() {
		return new ConcurrentHashMap<String, WebSocketSender>();
	}

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}

WebSocketSender 是我们自己创建的类,目的是保存 WebSocket 连接的 session 以及对应的 FluxSink,以便在 WebSocketHandler 代码范围外发送数据:

public class WebSocketSender {
	private WebSocketSession session;
    private FluxSink<WebSocketMessage> sink;

    public WebSocketSender(WebSocketSession session, FluxSink<WebSocketMessage> sink) {
        this.session = session;
        this.sink = sink;
    }

    public void sendData(String data) {
        sink.next(session.textMessage(data));
    }
}

接着我们来实现 HTTP Controller,用户在发起 HTTP 请求时,通过 query 参数指定要通信的 WebSocket 连接 id,以及要发送的数据,然后从 senderMap 中取出对应的 WebSocketSender,调用其 send() 方法向客户端发送数据:

@RestController
@RequestMapping("/msg")
public class MsgController {

	@Autowired
	private ConcurrentHashMap<String, WebSocketSender> senderMap;

	@RequestMapping("/send")
	public String sendMessage(@RequestParam String id, @RequestParam String data) {
		WebSocketSender sender = senderMap.get(id);
		if (sender != null) {
			sender.sendData(data);
			return String.format("Message '%s' sent to connection: %s.", data, id);
		} else {
			return String.format("Connection of id '%s' doesn't exist", id);
		}
	}
}

5. 测试

我这就不再写页面了,直接就用https://www.websocket.org/echo.html进行测试了,结果如下:

这样就算完成了定点推送了,全推送,和部分推送就不再写了,只要从ConcurrentHashMap中取出来去发送就是了。

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!
本文分享自作者个人站点/博客:https://my.oschina.net/bianxin/blog复制
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • 关系数据构建反应式的spring驱动程序

    响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式,直白的说就是:将变化的值通过数据流进行传播。

    kinbug [进阶者]
  • Java开发中Websocket的技术选型参考

    Websocket是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允...

    码农小胖哥
  • (5)Spring WebFlux快速上手——响应式Spring的道法术器「建议收藏」

    如上图所示,左侧为基于spring-webmvc的技术栈,右侧为基于spring-webflux的技术栈,

    全栈程序员站长
  • 服务端主动推送数据,除了 WebSocket 你还能想到啥?

    松哥原创的 Spring Boot 视频教程已经杀青,感兴趣的小伙伴戳这里-->Spring Boot+Vue+微人事视频教程

    江南一点雨
  • springboot深入浅出系列(16章97节)

    本书为spring boot 深入浅出系列视频教程的文档。 spring boot 深入浅出系列课程(16章97节)

    字母哥博客
  • 详述WebSocket原理

    WebSocket协议和HTTP协议一样,都是在ISO七层模型的最顶层——应用层。WebSocket允许服务器端主动向客户端推送数据。在WebSocket协议中...

    全栈程序员站长
  • WebSocket实现群发和单聊--Springboot实现

        WebSocket是HTML5出的东西(协议,就是大家一起约定好的东西),也就是说HTTP协议没有变化,或者说没关系,但HTTP是不支持持久连接的(长连...

    chinotan
  • SprinBoot——SpringBoot项目WebSocket推送

    发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/106472.html原文链接:https://javaforall.cn

    全栈程序员站长
  • WebSocket的运用

    WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

    越陌度阡
  • WebFlux 操作 MySQL 是种什么体验?

    松哥原创的 Spring Boot 视频教程已经杀青,感兴趣的小伙伴戳这里-->Spring Boot+Vue+微人事视频教程

    江南一点雨
  • 干货 | 长连接/websocket/SSE等主流服务器推送技术比较

    本文由携程市场营销研发部武艺嫱和王宇星以及张子祥共同撰写,武艺嫱在市场营销研发部负责前端,王宇星和张子祥在市场营销研发部负责java后端。

    疯狂的技术宅
  • 【JMeter系列-10】JMeter websocket接口测试

    在一个网站中,很多数据需要即时更新,比如期货交易类的用户资产。在以前,这种功能的实现一般使用http轮询,即客户端用定时任务每隔一段时间向服务器发送查询请求来获...

    云深i不知处
  • 【小家Spring】高性能关键技术之---体验Spring MVC的异步模式(ResponseBodyEmitter、SseEmitter、StreamingResponseBody) 高级使用篇

    上篇博文:【小家Spring】高性能关键技术之—体验Spring MVC的异步模式(Callable、WebAsyncTask、DeferredResult) ...

    YourBatman
  • 使用缓存技术10年了,总结了如下经验!

    一位七牛的资深架构师曾经说过这样一句话:“Nginx+业务逻辑层+数据库+缓存层+消息队列,这种模型几乎能适配绝大部分的业务场景。

    范蠡
  • WebFlux 初体验

    松哥原创的 Spring Boot 视频教程已经杀青,感兴趣的小伙伴戳这里-->Spring Boot+Vue+微人事视频教程

    江南一点雨
  • 石墨文档 Websocket 百万长连接技术实践

    点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 |...

    芋道源码
  • FCoin交易所API文档

    通过了解以下信息,您可以方便的使用 FCoin 提供的 API 来接入 FCoin 交易平台。

    全栈程序员站长
  • Web实时消息推送技术总结

      消息推送(Push),是指从服务端实时发送信息到客户端,最早诞生于 Email 中,用于提醒新的消息,想必大家都不陌生。   随着互联网技术的发展,很多网站...

    文渊同学
  • 54 个官方 Spring Boot Starers 出炉!别再重复造轮子了……

    在之前的文章,栈长介绍了 Spring Boot Starters,不清楚的可以点击链接进去看下。

    Java技术栈

扫码关注腾讯云开发者

领取腾讯云代金券