在上文:WebSocket开发(一对一聊天) 完成了一对一聊天的功能,但是消息补偿的功能并没有验证,这需要将客户端id的设置参数进行修改。
而且光日志打印记录WebSocket
事件的流转有点不靠谱,所以需要将事件进行落地,结构化数据像用户登陆记录
、用户代收消息
、用户在线状态
、操作日志
等业务线强的数据可以放到mysql中,像聊天记录
、图片
、漫游
等已经落地的消息数据可以放到mongodb、es中备份存储。这里demo为了方便就都使用mysql存储。
引入持久层框架,这里使用mybatis-plus
添加依赖
<!-- mybatis-plus 所需依赖 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<!-- MySQL连接 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
配置yml文件
server:
port: 5822
mybatis-plus:
global-config:
db-config:
id-type: auto
field-strategy: not_empty
column-underline: true
logic-delete-value: 0
logic-not-delete-value: 1
db-type: mysql
refresh: false
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
spring:
datasource:
url: jdbc:mysql://127.0.0.1/chatroom-im?useUnicode=true&characterEncoding=utf8
driver-class-name: com.mysql.jdbc.Driver
username: root
password: xxxx
启动类配置
启动类注解指定mapper包地址
@MapperScan("com.an.im.mapper")
因为目前没有增加用户落地的概念先不加用户表了,只按客户端定义的id
为客户端用户标识
,统计一下目前的流程中需要增加那些表。
用户连接记录表
: 描述
:客户端建立/断开连接的日志记录表;作用
:追溯数据使用客户端发送消息表
: 描述
:客户端发送的消息记录表;作用
:追溯跟对照数据使用服务端发送消息表
: 描述
:服务端发送的消息记录表;作用
:追溯跟对照数据使用一对一消息记录表
: 描述
:客户端发送的消息记录表;作用
:双客户端聊天数据记录漫游消息补偿表
: 描述
:客户端待接收的消息记录表作用
:客户端连接补偿消息使用异常记录表
: 描述
:产生异常的日志收集表作用
:排除异常情况使用此表主要统计用户连接跟断连的日志,核心字段就是用户id
、时间
、事件类型(连接/断连)
CREATE TABLE `chatroom-im`.`USER_LOGIN_EVENT` (
`id` int(0) NOT NULL,
`uid` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '用户id',
`event_type` tinyint(1) NOT NULL COMMENT '0:连接;1:断连',
`trigger_date` datetime(0) NOT NULL COMMENT '事件触发时间',
`create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
`update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
客户端发送的所有消息都要记录下来,这一步可以异步操作
,作为消息的落地存储,核心字段为客户端id
、时间
、消息内容
CREATE TABLE `chatroom-im`.`client_send_msg` (
`id` int(0) NOT NULL,
`uid` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '客户端id',
`info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '信息明细',
`send_date` datetime(0) NOT NULL COMMENT '发送时间',
`create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
`update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
`del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
服务端发送消息存储的信息跟客户端的类似,将存储的客户端id
修改为接收端id
就可以复用
CREATE TABLE `chatroom-im`.`server_send_msg` (
`id` int(0) NOT NULL,
`accept_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '接收端id',
`info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '信息明细',
`send_date` datetime(0) NOT NULL COMMENT '发送时间',
`create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
`update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
`del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
明确已经双端接收的消息进行记录作为漫游使用,核心字段:发送端id
、接收端id
、发送消息明细id
、接收消息明细id
、消息内容
、发送时间
、接收时间
CREATE TABLE `chatroom-im`.`Untitled` (
`id` int(0) NOT NULL,
`send_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '发送端id',
`accept_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '接收端id',
`send_msg_id` int(0) NULL DEFAULT NULL COMMENT '发送消息明细id',
`accept_msg_id` int(0) NULL DEFAULT NULL COMMENT '接收消息明细id',
`info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息内容',
`msg_send_date` datetime(0) NULL DEFAULT NULL COMMENT '消息发送时间',
`accept_date` datetime(0) NULL DEFAULT NULL COMMENT '消息接收时间',
`create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
`update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
`del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
这个表主要记录需要补充的消息记录来做消息补偿
跟历史数据追溯
。核心字段:接收端id
、消息内容
、补偿时间
、补偿状态
CREATE TABLE `chatroom-im`.`client_compensate_msg` (
`id` int(0) NOT NULL,
`accept_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '接收端id',
`info_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '信息明细',
`send_date` datetime(0) NOT NULL COMMENT '发送时间',
`compensate_satus` tinyint(1) NOT NULL COMMENT '补偿状态(0:未补偿;1:已补偿;2:失败)',
`create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
`update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
`del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
异常记录主要作为在OnError
事件中发生异常内容的记录,核心字段:客户端id
、异常内容
、触发事件
CREATE TABLE `chatroom-im`.`error_event_msg` (
`id` int(0) NOT NULL,
`uid` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '接收端id',
`error_msg` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '异常明细',
`trigger_date` datetime(0) NOT NULL COMMENT '触发时间',
`create_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '创建日期',
`update_date` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改日期',
`del_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除;1:已删除)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
表结构既然定义好了,就在各事件触发时进行持久化操作,需要先对这些表建立Mybatis-plus
的实体跟Maaper类
,这里不写出来了,后面会给出git地址
。
在WebSocket
的server
里注入对应会有以下问题:
问题:在websocket的server文件里是无法使用@autowired注解自动注入的
原因:spring容器管理的是单例的,他只会注入一次,而websocket是多对象的,当有新的用户使用的时候,他就会新创建一个websocket对象,这就导致了用户创建的websocket对象都不能注入对象了,所以在运行的时候就会发生注入对象为null的情况;
解决方法:把需要注入的service声明为静态对象,如下代码:
private static BaseWebSocketService baseWebSocketService;
@Autowired
public void setService(BaseWebSocketService baseWebSocketService){
WebSocketClient.baseWebSocketService = baseWebSocketService;
}
这里的持久化操作我使用一个统一的接口BaseWebSocketService
异步来进行处理,不会影响主业务并且方便以后可以调整是否持久化
用户记录的持久化是在OnOpen
事件中进行的添加,代码如下:
@OnOpen
public void onOpen(Session session,@PathParam("clientId") String clientId){
if (!webSocketClientMap.containsKey(clientId)){
onlineUsers.addAndGet(1);
}
webSocketClientMap.put(clientId,this);
infoSession = session;
log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
/**
* 持久化
*/
baseWebSocketService.saveUserLoginEvent(clientId,(byte) 0,new Date());
/**
* 消息补偿
*/
if (!CollectionUtils.isEmpty(this.ToBeSentMap.get(clientId))){
this.ToBeSentMap.get(clientId).forEach(userMessageModel->{
this.sendMessage(BaseResponseMessage.success(userMessageModel));
});
}
}
saveUserLoginEvent
用户id
、事件类型(连接/断连)
、时间
user_login_event 表内数据验证:
客户端发送信息服务端是在onMessage
这个事件中接收的,因此持久化操作也在这个方法里实现,这个持久化是做信息记录的,所以只要是发送上来的数据都进行记录,将它放到方法的最前面。
代码如下:
@OnMessage
public void onMessage(String message, Session session,@PathParam("clientId") String clientId){
/**
* 持久化
*/
baseWebSocketService.saveClientSendMsg(clientId,message,new Date());
/**
* 处理消息
*/
UserMessageModel userMessageModel = JSONObject.parseObject(message, UserMessageModel.class);
if (userMessageModel == null){
this.sendMessage(BaseResponseMessage.error(null,"传递参数结构异常"));
}
if(!webSocketClientMap.containsKey(userMessageModel.getAcceptId())){
// 放到待发送列表里
if(!this.ToBeSentMap.containsKey(userMessageModel.getAcceptId())){
this.ToBeSentMap.put(userMessageModel.getAcceptId(),new CopyOnWriteArrayList<>());
}
List<UserMessageModel> addList = this.ToBeSentMap.get(userMessageModel.getAcceptId());
addList.add(userMessageModel);
log.info("客户端:{} 发送消息到接受端:{} 不在线,放置到代发送列表,当前待发送列表:{}条",clientId,userMessageModel.getAcceptId(), addList.size());
this.sendMessage(BaseResponseMessage.error(null,"接收端不在线"));
}else{
log.info("客户端:{} 发送到客户端:{},消息内容:{}",clientId,userMessageModel.getAcceptId(),userMessageModel.getMessage());
webSocketClientMap.get(userMessageModel.getAcceptId()).sendMessage(BaseResponseMessage.success(userMessageModel));
this.sendMessage(BaseResponseMessage.success(userMessageModel));
}
}
saveClientSendMsg
客户端id
、时间
、消息内容
client_send_msg 表内数据验证:
服务端发送消息是需要找到对应客户端的Session
进行send
事件的,我们之前创建了一个方法sendMessage
专门用来做发送消息使用,所以将持久化的操作放到这里来。
但是在这里发现传参没有加客户端id,但是每次发送数据都传参客户端id并不太方便也不好维护,所以定义一个类的局部变量,在建立连接时将客户端id放到这个局部变量中
伪代码:
private String clientId;
@OnOpen
public void onOpen(Session session,@PathParam("clientId") String clientId){
this.clientId = clientId;
}
这样可以直接在sendMessage
方法中拿到所属的客户端id
了
代码如下:
private void sendMessage(Object message){
try {
baseWebSocketService.saveServerSendMsg(message,this.clientId,new Date());
this.infoSession.getBasicRemote().sendObject(message);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (EncodeException e) {
throw new RuntimeException(e);
}
}
saveServerSendMsg
接收端id
、时间
、消息内容
server_send_msg 表内数据验证:
一对一的记录需要摘选发送端id
、接收端id
、消息内容
、发送时间
、接受时间
等,处理逻辑相较复杂写,这里不考虑数据一致性,否则还有很多事情需要做,只是建立基础的信息记录。
创建一个新的方法来拆分需要记录的参数和异步记录,代码如下:
private void toCSucceed(UserMessageModel userMessageModel){
WebSocketClient webSocketClient = webSocketClientMap.get(userMessageModel.getAcceptId());
BaseResponseMessage infoMsg = BaseResponseMessage.success(userMessageModel);
/**
* 持久化
*/
baseWebSocketService.saveCTOCMsg(this.clientId,webSocketClient.clientId,JSONObject.toJSONString(infoMsg),new Date(),new Date());
/**
* 发送消息
*/
webSocketClient.sendMessage(infoMsg);
this.sendMessage(infoMsg);
log.info("客户端:{} 发送到客户端:{},消息内容:{}",clientId,userMessageModel.getAcceptId(),userMessageModel.getMessage());
}
saveCTOCMsg
发送端id
、接收端id
、消息内容
、发送时间
、接收时间
(因为是异步操作就不计消息明细的对应id了)
c_to_c_msg 表内数据验证:
这个也是一个比较核心的功能,记录肯定要补偿的,这一部分可以替换ConcurrentHashMap<String,List<UserMessageModel>>
直接存到Mysql中,每次连接去mysql
读取有没有需要补偿的记录。
之前是在服务端接受信息OnMessage
事件中如果接收端不在线就放入补偿列表里,现在直接将这步调整为写入mysql
的消息补偿表
中
代码如下:
@OnMessage
public void onMessage(String message, Session session,@PathParam("clientId") String clientId){
/**
* 持久化
*/
baseWebSocketService.saveClientSendMsg(clientId,message,new Date());
/**
* 处理消息
*/
UserMessageModel userMessageModel = JSONObject.parseObject(message, UserMessageModel.class);
if (userMessageModel == null){
this.sendMessage(BaseResponseMessage.error(null,"传递参数结构异常"));
}
if(!webSocketClientMap.containsKey(userMessageModel.getAcceptId())){
// 放到待发送列表里
/*if(!this.ToBeSentMap.containsKey(userMessageModel.getAcceptId())){
this.ToBeSentMap.put(userMessageModel.getAcceptId(),new CopyOnWriteArrayList<>());
}
List<UserMessageModel> addList = this.ToBeSentMap.get(userMessageModel.getAcceptId());
addList.add(userMessageModel);*/
baseWebSocketService.saveClientCompensateMsg(userMessageModel.getAcceptId(),message,(byte) 0);
log.info("客户端:{} 发送消息到接受端:{} 不在线,放置到代发送列表,当前待发送列表:{}条",clientId,userMessageModel.getAcceptId());
this.sendMessage(BaseResponseMessage.error(null,"接收端不在线"));
}else{
this.toCSucceed(userMessageModel);
}
}
saveClientCompensateMsg
接收端id
、消息内容
、补偿状态
client_compensate_msg 表内数据验证:
消息补偿是在OnOpen
事件中进行的,不再通过内存中的Map结构
进行补偿,改为根据客户端id
查看mysql
中有没有需要补偿的数据。
代码如下:
@OnOpen
public void onOpen(Session session,@PathParam("clientId") String clientId){
if (!webSocketClientMap.containsKey(clientId)){
onlineUsers.addAndGet(1);
}
this.clientId = clientId;
webSocketClientMap.put(clientId,this);
infoSession = session;
log.info("客户端:{}建立连接,当前在线人数:{}",clientId,onlineUsers.get());
/**
* 持久化
*/
baseWebSocketService.saveUserLoginEvent(clientId,(byte) 0,new Date());
/**
* 消息补偿
*/
/*
if (!CollectionUtils.isEmpty(this.ToBeSentMap.get(clientId))){
this.ToBeSentMap.get(clientId).forEach(userMessageModel->{
this.sendMessage(BaseResponseMessage.success(userMessageModel));
});
}
*/
List<ClientCompensateMsg> list = baseWebSocketService.queryClientCompensateMsg(clientId,0);
if (!CollectionUtils.isEmpty(list)){
list.forEach(userMessageModel->{
log.info("消息补偿记录,客户端:{},消息内容:{}",clientId,userMessageModel);
this.sendMessage(BaseResponseMessage.success(userMessageModel));
});
}
}
queryClientCompensateMsg
接收端id
、未补偿状态
验证:
补偿表中有一条110
ID的客户端有代发送记录,将前端的uid
参数设置由时间戳改为110
var uid = 110;
重启服务进行连接验证
日志验证:
web验证:
补偿成功后将补偿表对应数据状态进行修改
这个操作比较简单,只要触发onError
事件就将信息存储起来即可
代码如下:
@OnError
public void onError(Session session, Throwable error, @PathParam("clientId") String clientId){
log.error("连接异常:{}",error.getMessage());
baseWebSocketService.saveErrorEventMsg(clientId,error.getMessage());
}
saveErrorEventMsg
客户端id
、异常内容
验证:
模拟一个WebSocket的异常,随便抛出一个异常即可,只要触发onError
事件即可
数据库记录:
到此完成了基础的数据落地,以上代码细节没有经过推敲所以并不太完善,大概模拟了一个落地的场景,实际记录落地还要考虑各样的
数据一致性
、丢数据
、补数据
、异常状态场景
、机制重试
等等功能,所以理解这个想法即可。