在之前的文章:Spring Boot使用WebSocket模拟聊天 中简单的建立了Spring boot
项目并集成了websocket
实现了一些入门demo
,本篇文章则是在之前的基础上增加一对一私聊和统计在线人数等功能。
下面代码都是基于上篇文章中的代码进行的修改,开始的步骤中为了一步步的循环渐进所以只展示修改部分的代码,如果有感觉不连贯的同学可以完成上篇文章后再按本篇步骤执行,完整代码会在最后贴出。
虽然在WebSocket
传递消息的Session
中有SessionId
可以作为客户端标识,但是并不太适合业务的自定义需求,所以客户端传参需要先定义客户端唯一标识UID
,先将UID
使用时间戳生成。
客户端代码如下:
var uid = Date.now();
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window) {
websocket = new WebSocket("ws://127.0.0.1:5822/api/websocket/client/"+uid);
} else {
alert('当前浏览器 Not support websocket')
}
建立连接成功后再回调函数内将当前用户的uid
加载到页面上
//连接成功建立回调方法
websocket.onopen = function() {
console.log("WebSocket连接成功");
document.getElementById("userHeader").innerText="您的id为:"+uid;
}
客户端增加传参,服务端也要调整ServerEndpoint
的地址来接受uid
。代码如下:
@ServerEndpoint(value = "/api/websocket/client/{clientId}",encoders = {HashMapEncoder.class, BaseModelEncoder.class})
建立连接事件
时则需要记住连接的客户端跟对应Session的映射
,建立映射关系就用Map
结构,以uid
为key
当前对象为value
存到公共的Map
对象里。
假设业务需要统计在线人数,所以在建立连接时再添加一个维护在线人数的事件,这里单独维护一个AtomicInteger
维护在线人数值,注意校验客户端是否重复登陆,重复连接不计数只覆盖Map,所以要增加判断uid是否已经在线的判断。
代码如下:
@Slf4j
@Component
@ServerEndpoint(value = "/api/websocket/client/{clientId}",encoders = {HashMapEncoder.class, BaseModelEncoder.class})
public class WebSocketClient {
public static HashMap<String,WebSocketClient> webSocketClientMap = new HashMap<>();
public static AtomicInteger onlineUsers = new AtomicInteger();
private Session infoSession;
@OnOpen
public void onOpen(Session session,@PathParam("clientId") String clientId){
if (!webSocketClientMap.containsKey(clientId)){
onlineUsers.addAndGet(1);
}
webSocketClientMap.put(clientId,this);
infoSession = session;
log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
}
}
在客户端使用两个table页
建立连接
之前的发送消息的demo
是只发生到服务端,没有说一对一的发送到某个客户端,所以直接调用send事件
将消息传递到服务端即可,但是想要一对一发送到某个客户端则需要说明发送到那个客户端上,也就是接收端标识
,我们这里新增了一个输入框来输入接收端的标识也参数名称定义为acceptId
。
注意:前端传递的信息为Object对象时需要使用JSON.stringify()函数转为String,否则后台接受为[Object object]字符串
function send() {
var message = document.getElementById("message").value;
var acceptId = document.getElementById("acceptId").value;
var model = {
"message":message,
"sendType":"USER",
"acceptId":acceptId,
}
websocket.send(JSON.stringify(model));
}
message
:为需要发送的消息acceptId
:为接收客户端的idsendType
:发生消息类型;目前冗余字段,向后扩展使用
服务端在收到消息时先判断是那种类型消息,默认现在都是1对1
的消息类型,先将客户端发送的JSON字符串
转为实体,实体结构就是客户端传递的参数内容。
实体代码:
@Data
public class UserMessageModel {
/**
* 消息内容
*/
private String message;
/**
* 发送类型:USER
*/
private String sendType;
/**
* 接收端id
*/
private String acceptId;
/**
* 接收类型:USER
*/
private String acceptType;
}
转换后得到实体结构,先判断是否为空,不为空时查看当前在线的客户端Map
里是否有接收端
,如果有则给接收端发送消息,并且通知给发送端一份。如果没有则放到待发送列表里等待设备上线发送
。
代码如下:
@OnMessage
public void onMessage(String message, Session session,@PathParam("clientId") String clientId){
UserMessageModel userMessageModel = JSONObject.parseObject(message, UserMessageModel.class);
if (userMessageModel == null){
this.sendMessage(BaseResponseMessage.error(null,"传递参数结构异常"));
}
if(!webSocketClientMap.containsKey(userMessageModel.getAcceptId())){
// 放到待发送列表里
this.ToBeSentList.add(userMessageModel);
log.info("客户端:{} 发送消息到接受端:{} 不在线,放置到代发送列表,当前待发送列表:{}条",clientId,userMessageModel.getAcceptId(),ToBeSentList.size());
this.sendMessage(BaseResponseMessage.error(null,"接收端不在线"));
}else{
log.info("客户端:{} 发送到客户端:{},消息内容:{}",clientId,userMessageModel.getAcceptId(),userMessageModel.getMessage());
webSocketClientMap.get(userMessageModel.getAcceptId()).sendMessage(BaseResponseMessage.success(userMessageModel));
this.sendMessage(BaseResponseMessage.success(userMessageModel));
}
}
客户端接收的事件是websocket.onmessage
,可以直接在函数里console.log(event)
查看客户端接收的内容,为了方便我直接将数据打印到页面上。
HTML代码
<div id="infoData" style="background-color: #d8aaaa;
width: 30%;
margin-top: 5%;"></div>
JS代码
websocket.onmessage = function (event){
console.log(event);
var html = document.getElementById("infoData").innerHTML;
document.getElementById("infoData").innerHTML=html+"接受到消息:"+event.data+"</br>";
}
使用两个tab页
建立连接得到两个客户端的id
验证两个场景:
在接收端存在的情况下就直接发送信息到接收端,使用1661163395967
发送消息到1661163398729
服务端日志
接收端消息
测试成功
使用1661163395967
发送消息到nullClient
,nullClient
肯定不存在。
服务端日志
发送端消息
上面既然做了待发送消息的缓存就要肯定做消息的补偿发送了,消息的补偿发送就需要监测接收端上线后将消息推送到接收端,所以需要再onOpen事件
建立连接时进行补偿。
@OnOpen
public void onOpen(Session session,@PathParam("clientId") String clientId){
if (!webSocketClientMap.containsKey(clientId)){
onlineUsers.addAndGet(1);
}
webSocketClientMap.put(clientId,this);
infoSession = session;
log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
/**
* 消息补偿
*/
ToBeSentList.forEach(userMessageModel->{
if (clientId.equals(userMessageModel.getAcceptId())){
this.sendMessage(BaseResponseMessage.success(userMessageModel));
}
});
}
其实在补偿上使用List
并不太合适,每次都需要遍历全部待发送数据,可以将代发送存储结构改为Map结构
,这需要修改之前不在线时的处理逻辑。
ToBeSentList修改为ToBeSentMap
public static HashMap<String,WebSocketClient> webSocketClientMap = new HashMap<>();
写入待发送的逻辑修改
修改为Map
结构,key为接收端id
,value为一个List
,存储这个接收端的待发送信息
@OnMessage
public void onMessage(String message, Session session,@PathParam("clientId") String clientId){
UserMessageModel userMessageModel = JSONObject.parseObject(message, UserMessageModel.class);
if (userMessageModel == null){
this.sendMessage(BaseResponseMessage.error(null,"传递参数结构异常"));
}
if(!webSocketClientMap.containsKey(userMessageModel.getAcceptId())){
// 放到待发送列表里
if(!this.ToBeSentMap.containsKey(userMessageModel.getAcceptId())){
this.ToBeSentMap.put(userMessageModel.getAcceptId(),new CopyOnWriteArrayList<>());
}
List<UserMessageModel> addList = this.ToBeSentMap.get(userMessageModel.getAcceptId());
addList.add(userMessageModel);
log.info("客户端:{} 发送消息到接受端:{} 不在线,放置到代发送列表,当前待发送列表:{}条",clientId,userMessageModel.getAcceptId(), addList.size());
this.sendMessage(BaseResponseMessage.error(null,"接收端不在线"));
}else{
log.info("客户端:{} 发送到客户端:{},消息内容:{}",clientId,userMessageModel.getAcceptId(),userMessageModel.getMessage());
webSocketClientMap.get(userMessageModel.getAcceptId()).sendMessage(BaseResponseMessage.success(userMessageModel));
this.sendMessage(BaseResponseMessage.success(userMessageModel));
}
}
补偿逻辑修改
在连接时触发补偿不需要再遍历全部的list数据
,只需要根据客户端id
从Map
中拿取属于这个客户端的待发送数据即可。
@OnOpen
public void onOpen(Session session,@PathParam("clientId") String clientId){
if (!webSocketClientMap.containsKey(clientId)){
onlineUsers.addAndGet(1);
}
webSocketClientMap.put(clientId,this);
infoSession = session;
log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
/**
* 消息补偿
*/
if (!CollectionUtils.isEmpty(this.ToBeSentMap.get(clientId))){
this.ToBeSentMap.get(clientId).forEach(userMessageModel->{
this.sendMessage(BaseResponseMessage.success(userMessageModel));
});
}
}
首先验证待发送消息的存储是否正确,一个发送端
发送两个接收端
的待发送消息日志总数的提示会分开
记录,这代表不同的接收端的待接收总数不一样。
使用客户端
发送消息到nullClient-1
和nullClient-2
两个接收端
服务端验证
可以看到接收端的待发送列表计数是分开的,存储的验证完成客户端
补偿验证就需要指定客户端id了,目前的按时间戳生成客户id的方式无法精准的进行补偿测试,所以需要修改前端建立连接的方式,这个后面我再补充。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div id="userHeader">
</div>
消息内容:<input type="text" id="message"></in></br>
接受人:<input type="text" id="acceptId"></in>
<button onclick="send()">发送</button>
<button onclick="webclose()">关闭连接</button>
<div id="infoData" style="background-color: #d8aaaa;
width: 30%;
margin-top: 5%;"></div>
</body>
<script>
var uid = Date.now();
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window) {
websocket = new WebSocket("ws://127.0.0.1:5822/api/websocket/client/"+uid);
} else {
alert('当前浏览器 Not support websocket')
}
//连接成功建立回调方法
websocket.onopen = function() {
console.log("WebSocket连接成功");
document.getElementById("userHeader").innerText="您的id为:"+uid;
}
websocket.onmessage = function (event){
console.log(event);
var html = document.getElementById("infoData").innerHTML;
document.getElementById("infoData").innerHTML=html+"接受到消息:"+event.data+"</br>";
}
websocket.onclose = function() {
alert("WebSocket连接关闭");
}
websocket.onerror = function (event){
console.log(event)
}
function send() {
var message = document.getElementById("message").value;
var acceptId = document.getElementById("acceptId").value;
var model = {
"message":message,
"sendType":"USER",
"acceptId":acceptId,
}
websocket.send(JSON.stringify(model));
}
function webclose(){
websocket.close();
}
</script>
</html>
package com.an.websocket.webserver;
import com.alibaba.fastjson.JSONObject;
import com.an.websocket.model.client.BaseResponseMessage;
import com.an.websocket.webserver.encoder.BaseModelEncoder;
import com.an.websocket.webserver.encoder.HashMapEncoder;
import com.an.websocket.model.client.UserMessageModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author YUSHENGDADA
* @title: WebSocketClient
* @projectName v2_lab
* @description: 客户端
* @date 2022/8/22 0022上午 10:03
*/
@Slf4j
@Component
@ServerEndpoint(value = "/api/websocket/client/{clientId}",encoders = {HashMapEncoder.class, BaseModelEncoder.class})
public class WebSocketClient {
public static HashMap<String,WebSocketClient> webSocketClientMap = new HashMap<>();
public static ConcurrentHashMap<String,List<UserMessageModel>> ToBeSentMap = new ConcurrentHashMap<>();
public static AtomicInteger onlineUsers = new AtomicInteger();
private Session infoSession;
@OnOpen
public void onOpen(Session session,@PathParam("clientId") String clientId){
if (!webSocketClientMap.containsKey(clientId)){
onlineUsers.addAndGet(1);
}
webSocketClientMap.put(clientId,this);
infoSession = session;
log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
/**
* 消息补偿
*/
if (!CollectionUtils.isEmpty(this.ToBeSentMap.get(clientId))){
this.ToBeSentMap.get(clientId).forEach(userMessageModel->{
this.sendMessage(BaseResponseMessage.success(userMessageModel));
});
}
}
@OnClose
public void onclose(Session session, @PathParam("clientId") String clientId){
if (webSocketClientMap.containsKey(clientId)) {
webSocketClientMap.remove(clientId);
onlineUsers.getAndAdd(-1);
}
log.info("客户端:{}断开连接,当前在线人数:{}",clientId,onlineUsers.get());
}
@OnError
public void onError(Session session, Throwable error){
log.error("连接异常:{}",error.getMessage());
}
@OnMessage
public void onMessage(String message, Session session,@PathParam("clientId") String clientId){
UserMessageModel userMessageModel = JSONObject.parseObject(message, UserMessageModel.class);
if (userMessageModel == null){
this.sendMessage(BaseResponseMessage.error(null,"传递参数结构异常"));
}
if(!webSocketClientMap.containsKey(userMessageModel.getAcceptId())){
// 放到待发送列表里
if(!this.ToBeSentMap.containsKey(userMessageModel.getAcceptId())){
this.ToBeSentMap.put(userMessageModel.getAcceptId(),new CopyOnWriteArrayList<>());
}
List<UserMessageModel> addList = this.ToBeSentMap.get(userMessageModel.getAcceptId());
addList.add(userMessageModel);
log.info("客户端:{} 发送消息到接受端:{} 不在线,放置到代发送列表,当前待发送列表:{}条",clientId,userMessageModel.getAcceptId(), addList.size());
this.sendMessage(BaseResponseMessage.error(null,"接收端不在线"));
}else{
log.info("客户端:{} 发送到客户端:{},消息内容:{}",clientId,userMessageModel.getAcceptId(),userMessageModel.getMessage());
webSocketClientMap.get(userMessageModel.getAcceptId()).sendMessage(BaseResponseMessage.success(userMessageModel));
this.sendMessage(BaseResponseMessage.success(userMessageModel));
}
}
private void sendMessage(Object message){
try {
this.infoSession.getBasicRemote().sendObject(message);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (EncodeException e) {
throw new RuntimeException(e);
}
}
}
@Data
public class BaseResponseMessage<T> {
private String code;
private String msg;
private T data;
public static <T> BaseResponseMessage success(T data) {
BaseResponseMessage baseResponseMessage = new BaseResponseMessage();
baseResponseMessage.code = "0";
baseResponseMessage.msg = "成功";
baseResponseMessage.data = data;
return baseResponseMessage;
}
public static <T> BaseResponseMessage error(T data,String msg) {
BaseResponseMessage baseResponseMessage = new BaseResponseMessage();
baseResponseMessage.code = "500";
baseResponseMessage.msg = msg;
baseResponseMessage.data = data;
return baseResponseMessage;
}
}
@Data
public class UserMessageModel {
/**
* 消息内容
*/
private String message;
/**
* 发送类型:USER
*/
private String sendType;
/**
* 接收端id
*/
private String acceptId;
/**
* 接收类型:USER
*/
private String acceptType;
}
这两个类看这篇文章:websocket使用sendObject产生的问题 可以知道这两个类的作用和源码