[WebSocket]第二章:WebSocket集群分布式改造——实现多人在线聊天室

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/qqxx6661/article/details/100064741

正文

WebSocket集群/分布式改造:实现多人在线聊天室

为何要改造为分布式集群

分布式就是为了解决单点故障问题,想象一下,如果一个服务器承载了1000个大佬同时聊天,服务器突然挂了,1000个大佬瞬间全部掉线,大概明天你就被大佬们吊起来打了。 当聊天室改为集群后,就算服务器A挂了,服务器B上聊天的大佬们还可以愉快的聊天,并且在前端还能通过代码,让连接A的大佬们快速重连至存活的服务器B,继续和大家愉快的聊天,岂不美哉!

总结一下:实现了分布式WebSocket后,我们可以将流量负载均衡到不同的服务器上并提供一种通信机制让各个服务器能进行消息同步(不然用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都没法收到)。

如何改造为分布式集群

当我们要实现分布式的时候,我们则需要在各个机器上共享这些信息,所以我们需要一个Publish/Subscribe的中间件。我们现在使用Redis作为我们的解决方案。

1. 用户在聊天室集群如何发消息

假设我们的聊天室集群有服务器A和B,用户Alice连接在A上,Bob连接在B上、

Alice向聊天室的服务器A发送消息,A服务器必须要将收到的消息转发到Redis,才能保证聊天室集群的所有服务器(也就是A和B)能够拿到消息。否则,只有Alice在的服务器A能够读到消息,用户Bob在的服务器B并不能收到消息,A和B也就无法聊天了。

2. 用户在聊天室集群如何接收消息

说完了发送消息,那么如何保证Alice发的消息,其他所有人都能收到呢,前面我们知道了Alice发送的消息已经被传到了Redis的频道,那么所有服务器都必须订阅这个Redis频道,然后把这个频道的消息转发到自己的用户那里,这样自己服务器所管辖的用户就能收到消息。

补充知识点:STOMP 简介

上期我们搭建了个websocket聊天室demo,并且使用了STOMP协议,但是我并没有介绍到底什么是STOMP协议,同学们会有疑惑,这里对于STOMP有很好地总结:

当直接使用WebSocket时(或SockJS)就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议(wire protocol),因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。 就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。 与HTTP请求和响应类似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,如下就是发送数据的一个STOMP帧:

>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20

{"message":"Marco!"}

好了,介绍完了概念,让我们开始动手改造!

功能一:向聊天室集群中的全体用户发消息——Redis的订阅/发布

如果你不熟悉Redis的sub/pub(订阅/发布)功能,请看这里进行简单了解它的用法,很简单:

https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

在我们上篇文章的Demo基础上,我们进行集群改造。上一篇文章的源码见下方:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/%E5%8D%95%E6%9C%BA%E7%89%88

1. 添加Redis依赖pom

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2. application.properties新增redis配置

当然首先要确保你安装了Redis,windows下安装redis比较麻烦,你可以搜索redis-for-windows下载安装。

# redis 连接配置
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false
# 空闲连接最大数
spring.redis.jedis.pool.max-idle=10
# 获取连接最大等待时间(s)
spring.redis.jedis.pool.max-wait=60000

3. 在application.properties添加频道名定义

# Redis定义
redis.channel.msgToAll = websocket.msgToAll

4. 新建redis/RedisListenerBean

package cn.monitor4all.springbootwebsocketdemo.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import java.net.Inet4Address;
import java.net.InetAddress;

/**
 * Redis订阅频道属性类
 * @author yangzhendong01
 */
@Component
public class RedisListenerBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);

    @Value("${server.port}")
    private String serverPort;

    @Value("${redis.channel.msgToAll}")
    private String msgToAll;

    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 监听msgToAll
        container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll));
        LOGGER.info("Subscribed Redis channel: " + msgToAll);
        return container;
    }
}

可以看到,我们在代码里监听了redis频道msgToAll,这个是在application.properties定义的,当然如果你懒得定义,这里可以写死。

5. 聊天室集群:发消息改造

我们单机聊天室的发送消息Controller是这样的:

@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
    public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
        return chatMessage;

我们前端发给我们消息后,直接给/topic/public转发这个消息,让其他用户收到。

在集群中,我们需要把消息转发给Redis,并且不转发给前端,而是让服务端监听Redis消息,在进行消息发送。

将Controller改为:

@Value("${redis.channel.msgToAll}")
private String msgToAll;

@Autowired
private RedisTemplate<String, String> redisTemplate;
    
@MessageMapping("/chat.sendMessage")
    public void sendMessage(@Payload ChatMessage chatMessage) {
        try {
            redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

你会发现我们在代码中使用了JsonUtil将实体类ChatMessage转为了Json发送给了Redis,这个Json工具类需要使用到FaskJson依赖:

  1. pom添加FastJson依赖
<!-- json -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.58</version>
</dependency>
  1. 添加Json解析工具类JsonUtil,提供对象转Json,Json转对象的能力
package cn.monitor4all.springbootwebsocketdemo.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * JSON 转换
 */
public final class JsonUtil {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);

    /**
     * 把Java对象转换成json字符串
     *
     * @param object 待转化为JSON字符串的Java对象
     * @return json 串 or null
     */
    public static String parseObjToJson(Object object) {
        String string = null;
        try {
            string = JSONObject.toJSONString(object);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return string;
    }

    /**
     * 将Json字符串信息转换成对应的Java对象
     *
     * @param json json字符串对象
     * @param c    对应的类型
     */
    public static <T> T parseJsonToObj(String json, Class<T> c) {
        try {
            JSONObject jsonObject = JSON.parseObject(json);
            return JSON.toJavaObject(jsonObject, c);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return null;
    }
}

这样,我们接收到用户发送消息的请求时,就将消息转发给了redis的频道websocket.msgToAll

6. 聊天室集群:接收消息改造

单机的聊天室,我们接收消息是通过Controller直接把消息转发到所有人的频道上,这样就能在所有人的聊天框显示。

在集群中,我们需要服务器把消息从Redis中拿出来,并且推送到自己管的用户那边,我们在Service层实现消息的推送。

  • 在处理消息之后发送消息: 正如前面看到的那样,使用 @MessageMapping 或者 @SubscribeMapping 注解可以处理客户端发送过来的消息,并选择方法是否有返回值。 如果 @MessageMapping注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。可以使用@SendTo 重写消息目的地; 如果 @SubscribeMapping注解的控制器方法有返回值的话,返回值会直接发送到客户端,不经过代理。如果加上@SendTo 注解的话,则要经过消息代理。
  • 在应用的任意地方发送消息: spring-websocket 定义了一个 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以实现自由的向任意目的地发送消息,并且订阅此目的地的所有用户都能收到消息。

我们在service实现发送,需要使用上述第二种方法。

新建类service/ChatService:

package cn.monitor4all.springbootwebsocketdemo.service;

import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;

@Service
public class ChatService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class);

    @Autowired
    private SimpMessageSendingOperations simpMessageSendingOperations;

    public void sendMsg(@Payload ChatMessage chatMessage) {
        LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }

}

我们在哪里调用这个service呢,我们需要在监听到消息后调用,所以我们就要有下面的Redis监听消息处理专用类

新建类redis/RedisListenerHandle:

package cn.monitor4all.springbootwebsocketdemo.redis;

import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import cn.monitor4all.springbootwebsocketdemo.service.ChatService;
import cn.monitor4all.springbootwebsocketdemo.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
 * Redis订阅频道处理类
 * @author yangzhendong01
 */
@Component
public class RedisListenerHandle extends MessageListenerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);

    @Value("${redis.channel.msgToAll}")
    private String msgToAll;

    @Value("${server.port}")
    private String serverPort;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private ChatService chatService;

    /**
     * 收到监听消息
     * @param message
     * @param bytes
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String rawMsg;
        String topic;
        try {
            rawMsg = redisTemplate.getStringSerializer().deserialize(body);
            topic = redisTemplate.getStringSerializer().deserialize(channel);
            LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            return;
        }


        if (msgToAll.equals(topic)) {
            LOGGER.info("Send message to all users:" + rawMsg);
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            // 发送消息给所有在线Cid
            chatService.sendMsg(chatMessage);
        } else {
            LOGGER.warn("No further operation with this topic!");
        }
    }
}

7. 看看效果

这样,我们的改造就基本完成了!我们看一下效果

我们将服务器运行在8080上,然后打开localhost:8080,起名Alice进入聊天室

随后,我们在application.properties中将端口server.port=8081

再次运行程序(别忘了开启IDEA的“允许启动多个并行服务”设置,不然会覆盖掉你的8080服务,如下图),在8081启动一个聊天室,起名Bob进入聊天室。

如下两图,我们已经可以在不同端口的两个聊天室,互相聊天了!(注意看url)

在互相发送消息是,我们还可以使用命令行监听下Redis的频道websocket.msgToAll,可以看到双方传送的消息。如下图:

我们还可以打开Chrome的F12控制台,查看前端的控制台发送消息的log,如下图:

大功告成了吗?

功能实现了,但是并不完美!你会发现,Bob的加入并没有提醒Bob进入了聊天室(在单机版是有的),这是因为我们在“加入聊天室”的代码还没有修改,在加入时,只有Bob的服务器B里的其他用户知道Bob加入了聊天室。我们还能再进一步!

功能二/功能三:集群用户上下线通知,集群用户信息存储

我们需要弥补上面的不足,将用户上线下线的广播发送到所有服务器上。

此外,我还希望以后能够查询集群中所有的在线用户,我们在redis中添加一个set,来保存用户名,这样就可以随时得到在线用户的数量和名称。

1. 在application.properties添加频道名定义

# Redis定义
redis.channel.userStatus = websocket.userStatus
redis.set.onlineUsers = websocket.onlineUsers

我们增加两个定义

  • 第一个是新增redis频道websocket.userStatus用来广播用户上下线消息
  • 第二个是redis的set,用来保存在线用户信息

2. 在RedisListenerBean添加新频道监听

container.addMessageListener(listenerAdapter, new PatternTopic(userStatus));

3. 在ChatService中添加

public void alertUserStatus(@Payload ChatMessage chatMessage) {
        LOGGER.info("Alert user online by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }

在service中我们向本服务器的用户广播消息,用户上线或者下线的消息都通过这里传达。

4. 修改ChatController中的addUser方法

@MessageMapping("/chat.addUser")
    public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {

        LOGGER.info("User added in Chatroom:" + chatMessage.getSender());
        try {
            headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
            redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender());
            redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

我们修改了addUser方法,在这里往redis中广播用户上线的消息,并把用户名username写入redis的set中(websocket.onlineUsers)

5. 修改WebSocketEventListener中的handleWebSocketDisconnectListener方法

@EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {

        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String username = (String) headerAccessor.getSessionAttributes().get("username");

        if(username != null) {
            LOGGER.info("User Disconnected : " + username);
            ChatMessage chatMessage = new ChatMessage();
            chatMessage.setType(ChatMessage.MessageType.LEAVE);
            chatMessage.setSender(username);
            try {
                redisTemplate.opsForSet().remove(onlineUsers, username);
                redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }

        }
    }

在用户关闭网页时,websocket会调用该方法,我们在这里需要把用户从redis的在线用户set里删除,并且向集群发送广播,说明该用户退出聊天室。

6. 修改Redis监听类RedisListenerHandle

 else if (userStatus.equals(topic)) {
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            if (chatMessage != null) {
                chatService.alertUserStatus(chatMessage);
            }

在监听类中我们接受了来自userStatus频道的消息,并调用service

7. 看看效果

此外,我们还可以在Reids中查询到用户信息:

WebSocket集群还有哪些可能性

有了这两篇文章的基础, 我们当然还能实现以下的功能:

  • 某用户A单独私信给某用户B,或者私信给某用户群(用户B和C)
  • 系统提供外部调用接口,给指定用户/用户群发送消息,实现消息推送
  • 系统提供外部接口,实时获取用户数据(人数/用户信息)

感兴趣的同学可以自己试试看。

参考文献

深入浅出Websocket(二)分布式Websocket集群

https://juejin.im/post/5ab84609f265da237d02fcb6

Spring消息之STOMP:

https://www.cnblogs.com/jmcui/p/8999998.html

总结

我们在本文中把单机版的聊天室改为了分布式聊天室,大大提高了聊天室可用性。

本文工程源代码:

单机版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/%E5%8D%95%E6%9C%BA%E7%89%88

集群版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/%E9%9B%86%E7%BE%A4%E7%89%88

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券