前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot2整合WebSocket,实现后台向前端推送信息

SpringBoot2整合WebSocket,实现后台向前端推送信息

作者头像
Li_XiaoJin
发布2022-06-12 10:59:24
2.2K0
发布2022-06-12 10:59:24
举报
文章被收录于专栏:Lixj's BlogLixj's Blog

背景是客户提出需要在 IOC 智能运营中心使用 Pad 控制页面进行跳转,类似于电视的遥控器一样。这样IOC的讲解员可以在 Pad 上面操作控制页面进行展示。我们的解决方案是通过使用 WebSocket 实现,前台监听,后台开放 API 给 Pad 上的页面,后台收到消息后推送给前台,前台再做出对应的反应。这样基本可以满足要求了~

什么是 WebSocket?

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,在 WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

为什么需要 WebSocket?

初次接触 WebSocket 的人,都会问同样的问题:我们已经有了 HTTP 协议,为什么还需要另一个协议?它能带来什么好处?

答案很简单,因为 HTTP 协议有一个缺陷:通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。

举例来说,我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。因此 WebSocket 就是这样发明的。如我们出去吃饭,在公众号上排队,不需要我们自己查询,当有变动时公众号实时将消息推送给我们,我们就可以知道排队的状态了,不必每次询问服务员。

WebSocket ws 和 wss 的区别

WS 协议和 WSS 协议两个均是 WebSocket 协议的 SCHEM,两者一个是非安全的,一个是安全的。也是统一的资源标志符。就好比 HTTP 协议和 HTTPS 协议的差别。非安全的没有证书,安全的需要 SSL 证书。

其中 WSS 表示在 TLS 之上的 WebSocket。WS 一般默认是 80 端口,而 WSS 默认是 443 端口,大多数网站用的就是 80 和 433 端口。

http 和 ws 的对应关系:

代码语言:javascript
复制
http -> new WebSocket('ws://xxx')

https -> new WebSocket('wss://xxx')

SpringBoot2整合WebSocket

添加 maven 依赖

pom.xml 添加如下依赖

代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

新建 WebSocket 的配置类

配置类会检测带有注解 @ServerEndpoint 的 bean 并注册它们。

记得加上 @Configuration 注解

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author lixiaojin
 * @date 2021/11/3 14:29
 */
@Configuration
@Slf4j
public class WebSocketConfig {

    /**
     * 给spring容器注入这个ServerEndpointExporter对象
     * 相当于xml:
     * <beans>
     * <bean/>
     * </beans>
     * <p>
     * 检测所有带有@serverEndpoint注解的bean并注册他们。
     *
     * @return ServerEndpointExporter对象
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        log.info("WebSocketConfig 注入!");
        return new ServerEndpointExporter();
    }
}

新建 WebSocketServer 的处理类

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author lixiaojin
 * @date 2021/11/3 16:06
 */
@ServerEndpoint("/webSocket/{username}")
@Component
@Slf4j
public class WebSocketServer {

    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
    private static Map<String, WebSocketServer> clients = new ConcurrentHashMap<>();
    private Session session;
    private String username;

    @OnOpen
    public void onOpen(@PathParam("username") String username, Session session) throws IOException {
        this.username = username;
        this.session = session;

        clients.put(username, this);
        // 在线数加1
        int cnt = ONLINE_COUNT.incrementAndGet();
        log.info("{}加入连接,当前连接数为:{}", username, cnt);

    }

    @OnClose
    public void onClose() throws IOException {
        clients.remove(username);
        int cnt = ONLINE_COUNT.decrementAndGet();
        log.info("有连接关闭,当前连接数为:{}", cnt);
    }

    @OnMessage
    public void onMessage(String message) throws IOException {
        log.info("来自客户端的消息:{}", message);
        sendInfo(session, message);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

    public void sendMessage(String message, String name) throws IOException {
        Session session = null;
        for (WebSocketServer item : clients.values()) {
            if (item.username.equals(name)) {
                session = item.session;
            }
        }
        if (session == null) {
            throw new BizException(CosmosResultCodeEnum.BIZ_FAIL, "找不到对应的seesion!");
        }
        sendInfo(session, message);
    }

    public void sendMessageAll(String message) throws IOException {
        for (WebSocketServer item : clients.values()) {
            sendInfo(item.session, message);
        }
    }

    private void sendInfo(Session session, String message) throws IOException {
        session.getAsyncRemote().sendText(message);
    }
}

新建 WebSocketController 控制器

提供 API 接口向前台发送消息。

代码语言:javascript
复制
package cn.sibat.ioc.remote.control.controller;

import cn.lixj.test.ioc.remote.control.manager.WebSocketServer;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;

/**
 * @author lixiaojin
 * @date 2021/11/3 14:39
 */
@Api(tags = "测试接口")
@RestController
@RequestMapping("/api/v1/")
public class TestController {

    @Autowired
    private WebSocketServer webSocketServer;

    @PostMapping("/test/send")
    public String sendInfo(@RequestParam String name, @RequestParam String message) {
        webSocketServer.sendMessage(message, name);
        return "发送成功!";
    }

}

前台测试页面

代码语言:javascript
复制
<html>
<head>
    <meta charset="UTF-8">
    <title>websocket测试</title>
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
    <style type="text/css">
        h3,h4{
            text-align:center;
        }
    </style>
</head>
<body>
 
<h3>WebSocket测试,客户端接收到的消息如下:</h3>
 
<textarea id = "messageId" readonly="readonly" cols="150" rows="30" >
 
</textarea>
 
 
<script type="text/javascript">
    var socket;
    if (typeof (WebSocket) == "undefined") {
        console.log("遗憾:您的浏览器不支持WebSocket");
    } else {
        console.log("恭喜:您的浏览器支持WebSocket");
        //实现化WebSocket对象
        //指定要连接的服务器地址与端口建立连接
        //注意ws、wss使用不同的端口。我使用自签名的证书测试,
        //无法使用wss,浏览器打开WebSocket时报错
        //ws对应http、wss对应https。
        socket = new WebSocket("ws://localhost:8080/webSocket/lixj");
        //连接打开事件
        socket.onopen = function() {
            //console.log("Socket 已打开");
            socket.send("消息发送测试(From Client)");
            
        };
        //收到消息事件
        socket.onmessage = function(msg) {
            $("#messageId").append(msg.data+ "\n");
            console.log(msg.data  );
        };
        //连接关闭事件
        socket.onclose = function() {
            console.log("Socket已关闭");
        };
        //发生了错误事件
        socket.onerror = function() {
            alert("Socket发生了错误");
        }
        //窗口关闭时,关闭连接
        window.unload=function() {
            socket.close();
        };
    }
    function hello(){
        setTimeout(socket.onopen, 1000);
    }
    //重复执行保持socket存活
    var t1 = window.setInterval(hello, 30000);
    
</script>
 
</body>
</html>

联调测试

  1. 启动 springboot 项目,启动 WebSocket 服务端;

  1. 打开前台页面,测试连接; 前台调用链接为:ws://localhost:8080/webSocket/lixj

  1. 调用后台接口发送消息;
代码语言:javascript
复制
curl -X POST -H  "Accept:*/*" -H  "Request-Origion:Knife4j" -H  "Content-Type:application/x-www-form-urlencoded" --data-urlencode  "message=你看到我的小熊了吗&name=lixj" "http://localhost:8080/api/v1/test/send"

  1. 前台接收到消息;

需要注意的地方

关于超时自动断开连接的问题

使用 Nginx 代理 WebSocket 时,客户端与服务器握手成功后,如果在 60 秒内没有数据交互,就会自动断开连接。因为 Nginx 默认的断开链接时间为 60 秒,为保持长连接,可有两种解决方法。

  1. 修改 Nginx 的超时时间;
  2. 前端在超时时间内做心跳保活机制(如上的 html 加了定时保活任务)

WebSocket 的 Nginx 配置问题

如果需要 Nginx 来进行代理,api 接口和 webSocket 需要分开配置,示例如下。

代码语言:javascript
复制
         location ^~ /api/ {
              proxy_pass http://localhost:8080;
         }
 
         location ^~ /webSocket/ {
             proxy_pass http://localhost:8080;
             proxy_http_version 1.1;
             proxy_set_header Upgrade $http_upgrade;
             proxy_set_header Connection upgrade;
         }

如果不需要指定 name 的话可以用下面的 WebSocketServer 类

根据 sessionId 进行匹配

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author lixiaojin
 * @date 2021/11/3 14:30
 */
@ServerEndpoint(value = "/ws/asset")
@Component
@Slf4j
public class WebSocketServer {

    private Session session;

    @PostConstruct
    public void init() {
        System.out.println("websocket 加载");
    }

    private static final AtomicInteger OnlineCount = new AtomicInteger(0);
    // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
    private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<Session>();


    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        SessionSet.add(session);
        // 在线数加1
        int cnt = OnlineCount.incrementAndGet();
        log.info("有连接加入,当前连接数为:{}", cnt);
        SendMessage(session, "连接成功");
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        SessionSet.remove(session);
        int cnt = OnlineCount.decrementAndGet();
        log.info("有连接关闭,当前连接数为:{}", cnt);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自客户端的消息:{}", message);
        SendMessage(session, "收到消息,消息内容:" + message);

    }

    /**
     * 出现错误
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误:{},Session ID:{}", error.getMessage(), session.getId());
        error.printStackTrace();
    }

    /**
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
     *
     * @param session
     * @param message
     */
    public static void SendMessage(Session session, String message) {
        try {
            session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
//            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("发送消息出错:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 群发消息
     *
     * @param message
     * @throws IOException
     */
    public void BroadCastInfo(String message) throws IOException {
        for (Session session : SessionSet) {
            if (session.isOpen()) {
                SendMessage(session, message);
            }
        }
    }

    /**
     * 指定Session发送消息
     *
     * @param sessionId
     * @param message
     * @throws IOException
     */
    public void SendMessage(String message, String sessionId) throws IOException {
        Session session = null;
        for (Session s : SessionSet) {
            if (s.getId().equals(sessionId)) {
                session = s;
                break;
            }
        }
        if (session != null) {
            SendMessage(session, message);
        } else {
            log.warn("没有找到你指定ID的会话:{}", sessionId);
        }
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

}

End.

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可 Links: https://cloud.tencent.com/developer/article/2020779

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-11-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是 WebSocket?
  • 为什么需要 WebSocket?
  • WebSocket ws 和 wss 的区别
  • SpringBoot2整合WebSocket
    • 添加 maven 依赖
      • 新建 WebSocket 的配置类
        • 新建 WebSocketServer 的处理类
          • 新建 WebSocketController 控制器
            • 前台测试页面
              • 联调测试
                • 需要注意的地方
                  • 关于超时自动断开连接的问题
                  • WebSocket 的 Nginx 配置问题
                  • 如果不需要指定 name 的话可以用下面的 WebSocketServer 类
              相关产品与服务
              SymantecSSL 证书
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档