前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【BIO】通过指定消息大小实现的多人聊天室-终极版本

【BIO】通过指定消息大小实现的多人聊天室-终极版本

作者头像
喜欢天文的pony站长
发布2020-07-08 15:50:01
3920
发布2020-07-08 15:50:01
举报
文章被收录于专栏:RabbitMQ实战RabbitMQ实战
# 前情提要:
  • 上一篇文章BIO在聊天室项目中的演化中提到,告知对方消息已经发送完毕的方式有4种
    1. 关闭Socket连接
    2. 关闭输出流,socket.shutdownOutput();
    3. 使用标志符号,借助字符流,Reader.readLine(),该方法会在读取到\r,\n或者\r\n时返回所读取到的内容。
    4. 通过指定本次发送的数据的字节大小。告知对方从输入流中读取指定大小的字节。

本文使用第四种方案来实现聊天室

  • 思路为:
    • 客户端在发送消息之前,先计算出本次发送的数据量的字节大小,比如为N个字节。那么在向服务器发送数据的前,先约定好流中的前1个字节(或者前X个字节,根据自己项目的实际情况来决定)为本次发送的数据量的大小。
    • 客户端发送消息,先将计算出的字节大小N写入输出流,再将实际的内容写入输出流。
    • 服务端在获取到输入流之后,根据约定,先读取前X个字节,根据这个字节的值可以知道,本次发送的数据量的大小,那么在读取数据时,只需要读取后续的N个字节即可。
  • 温馨提示: 注意看代码注释哟~

# 代码实现

  • 客户端
代码语言:javascript
复制
/**
 * @author futao
 * @date 2020/7/6
 */
public class BioChatClient {

    private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class);

    private static final ExecutorService SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor();

    /**
     * 启动客户端
     */
    public void start() {
        try {  //尝试连接到聊天服务器
            Socket socket = new Socket("localhost", Constants.SERVER_PORT);
            logger.debug("========== 成功连接到聊天服务器 ==========");

            InputStream inputStream = socket.getInputStream();
            OutputStream outputStream = socket.getOutputStream();

            //从输入流中读取数据
            SINGLE_THREAD_EXECUTOR.execute(() -> {
                try {
                    while (true) {
                        String message = IOUtils.messageReceiver(inputStream);
                        logger.info("接收到服务端消息:[{}]", message);
                    }
                } catch (IOException e) {
                    logger.error("发生异常", e);
                }
            });

            while (true) {
                //获取用户输入的数据
                String message = new Scanner(System.in).nextLine();
                if (StringUtils.isBlank(message)) {
                    break;
                }
                //将内容转换为字节数组
                byte[] contentBytes = message.getBytes(Constants.CHARSET);
                //内容字节数组的大小
                int length = contentBytes.length;
                //第一个字节写入本次传输的数据量的大小
                outputStream.write(length);
                //写入真正需要传输的内容
                outputStream.write(contentBytes);
                //刷新缓冲区
                outputStream.flush();

                if (Constants.KEY_WORD_QUIT.equals(message)) {
                    //客户端退出
                    SINGLE_THREAD_EXECUTOR.shutdownNow();
                    inputStream.close();
                    outputStream.close();
                    socket.close();
                    break;
                }
            }
        } catch (IOException e) {
            logger.error("发生异常", e);
        }
    }

    public static void main(String[] args) {
        new BioChatClient().start();
    }
}
  • 从输入流中读取指定大小的数据
代码语言:javascript
复制
    /**
     * 从输入流中读取指定大小的字节数据并转换成字符串
     *
     * @param inputStream 输入流
     * @return 读取到的字符串
     * @throws IOException
     */
    public static String messageReceiver(InputStream inputStream) throws IOException {
        //本次传输的数据量的大小
        int curMessageLength = inputStream.read();
        byte[] contentBytes = new byte[curMessageLength];
        //读取指定长度的字节
        inputStream.read(contentBytes);
        return new String(contentBytes);
    }

  • 服务端
代码语言:javascript
复制
/**
 * @author futao
 * @date 2020/7/6
 */
public class BioChatServer {

    private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class);

    /**
     * 可同时接入的客户端数量
     */
    private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);


    /**
     * 当前接入的客户端
     */
    private static final Set<Socket> CLIENT_SOCKET_SET = new HashSet<Socket>() {
        @Override
        public synchronized boolean add(Socket o) {
            return super.add(o);
        }

        @Override
        public synchronized boolean remove(Object o) {
            return super.remove(o);
        }
    };

    /**
     * 启动服务端
     */
    public void start() {
        try {
            //启动服务器,监听端口
            ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT);
            logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT);
            while (true) {
                //监听客户端接入事件
                Socket socket = serverSocket.accept();
                THREAD_POOL.execute(() -> {
                    CLIENT_SOCKET_SET.add(socket);
                    int port = socket.getPort();
                    logger.debug("客户端[{}]成功接入聊天服务器", port);
                    try {
                        InputStream inputStream = socket.getInputStream();
                        OutputStream outputStream = socket.getOutputStream();

                        while (true) {
                            //获取到客户端发送的消息
                            String message = IOUtils.messageReceiver(inputStream);
                            logger.info("接收到客户端[{}]发送的消息:[{}]", port, message);
                            //客户端是否退出
                            boolean isQuit = IOUtils.isQuit(message, socket, CLIENT_SOCKET_SET);
                            if (isQuit) {
                                socket.close();
                                break;
                            } else {
                                //消息转发
                                IOUtils.forwardMessage(port, message, CLIENT_SOCKET_SET);
                            }
                        }
                    } catch (IOException e) {
                        logger.error("发生异常", e);
                    }
                });
            }
        } catch (IOException e) {
            logger.error("发生异常", e);
        }
    }


    public static void main(String[] args) {
        new BioChatServer().start();
    }
}
  • 客户端下线与消息转发
代码语言:javascript
复制
/**
     * 判断客户端是否下线,并且将需要下线的客户端下线
     *
     * @param message         消息
     * @param socket          客户端Socket
     * @param clientSocketSet 当前接入的客户端Socket集合
     * @return 是否退出
     * @throws IOException
     */
    public static boolean isQuit(String message, Socket socket, Set<Socket> clientSocketSet) throws IOException {
        boolean isQuit = StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message);
        if (isQuit) {
            clientSocketSet.remove(socket);
            int port = socket.getPort();
            socket.close();
            logger.debug("客户端[{}]下线", port);
        }
        return isQuit;
    }

    /**
     * 转发消息
     *
     * @param curSocketPort   当前发送消息的客户端Socket的端口
     * @param message         需要转发的消息
     * @param clientSocketSet 当前接入的客户端Socket集合
     */
    public static void forwardMessage(int curSocketPort, String message, Set<Socket> clientSocketSet) {
        if (StringUtils.isBlank(message)) {
            return;
        }
        for (Socket socket : clientSocketSet) {
            if (socket.isClosed() || socket.getPort() == curSocketPort) {
                continue;
            }
            if (socket.getPort() != curSocketPort) {
                try {
                    OutputStream outputStream = socket.getOutputStream();
                    byte[] messageBytes = message.getBytes(Constants.CHARSET);
                    outputStream.write(messageBytes.length);
                    //将字符串编码之后写入客户端
                    outputStream.write(messageBytes);
                    //刷新缓冲区
                    outputStream.flush();
                } catch (IOException e) {
                    logger.error("消息转发失败", e);
                }
            }
        }
    }

# 测试一下~

  • 服务端启动,客户端接入

image.png

  • 客户端接入

image.png

  • 客户端发送消息

image.png

  • 服务端打印并转发消息

image.png

  • 聊天室内的其他小伙伴收到服务器转发的消息

image.png

  • 小马哥客户端下线

image.png

  • 服务器收到小马哥的下线通知

image.png

# 总结

  • 非常优雅~?

# 注意

  • 本文约定的是第一个字节为消息大小的标记,一个字节可以表示的最大值为255,所以一次最多传输255个字节,如果超过这个值,会造成业务错误,需要注意。
  • 所以使用几个字节来作为标识需要从业务的角度来考虑
    • 一个字节8位,可表示的最大值为 255 = 255B
    • 二个字节16位,可表示的最大值为 65535 = 64KB
    • 三个字节24位,可表示的最大值为 16777215 = 16MB
    • 四个字节32位,可表示的最大值为 4294967295 = 4GB
    • 以此类推....

# 系列文章

欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 喜欢天文 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # 前情提要:
  • # 代码实现
  • # 测试一下~
  • # 总结
  • # 注意
  • # 系列文章
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档