专栏首页Java研发军团WebSocket实现Web端即时通信

WebSocket实现Web端即时通信

前言

WebSocket 是HTML5开始提供的一种在浏览器和服务器间进行全双工通信的协议。目前很多没有使用WebSocket进行客户端服务端实时通信的web应用,大多使用设置规则时间的轮询,或者使用长轮询较多来处理消息的实时推送。这样势必会较大程度浪费服务器和带宽资源,而我们现在要讲的WebSocket正是来解决该问题而出现,使得B/S架构的应用拥有C/S架构一样的实时通信能力。

HTTP和WebSocket比较

HTTP

HTTP协议是半双工协议,也就是说在同一时间点只能处理一个方向的数据传输,同时HTTP消息也是过于庞大,里面包含大量消息头数据,真正在消息处理中很多数据不是必须的,这也是对资源的浪费。

  • 定时轮询:定时轮询就是客户端定时去向服务器发送HTTP请求,看是否有数据,服务器接受到请求后,返回数据给客户端,本次连接也会随着关闭。该实现方案最简单,但是会存在消息延迟和大量浪费服务器和带宽资源。
  • 长轮询:长轮询与定时轮询一样,也是通过HTTP请求实现,但这里不是定时发送请求。客户端发送请求给服务端,这时服务端会hold住该请求,当有数据过来或者超时时返回给请求的客户端并开始下一轮的请求。

WebSocket

WebSocket在客户端和服务端只需一次请求,就会在客户端和服务端建立一条通信通道,可以实时相互传输数据,并且不会像HTTP那样携带大量请求头等信息。因为WebSocket是基于TCP双向全双工通信的协议,所以支持在同一时间点处理发送和接收消息,做到实时的消息处理。

  • 建立WebSocket连接:建立WebSocket连接,首先客户端先要向服务端发送一个特殊的HTTP请求,使用的协议不是httphttps,而是使用过 wswss(一个非安全的,一个安全的,类似前两者之间的差别),请求头里面要附加一个申请协议升级的信息 Upgrade:websocket,还有随机生成一个 Sec-WebSocket-Key的值,及版本信息 Sec-WebSocket-Version等等。服务端收到客户端的请求后,会解析该请求的信息,包括请求协议升级,版本校验,以及将 Sec-WebSocket-Key的加密后以 sec-websocket-accept的值返回给客户端,这样客户端和服务端的连接就建立了。
  • 关闭WebSocket连接:客户端和服务端都可发送一个close控制帧,另一端主动关闭连接。

HTTP轮询和WebSocket生命周期示意图

服务端

这里服务端利用Netty的WebSocket开发。这里首先实现服务端启动类,然后自定义处理器来处理WebSocket的消息。

package com.ytao.websocket;
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.stream.ChunkedWriteHandler;
/** * Created by YANGTAO on 2019/11/17 0017. */public class WebSocketServer {
    public static String HOST = "127.0.0.1";    public static int PORT = 8806;
    public static void startUp() throws Exception {        // 监听端口的线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        // 处理每一条连接的数据读写的线程组        EventLoopGroup workerGroup = new NioEventLoopGroup();        // 启动的引导类        ServerBootstrap serverBootstrap = new ServerBootstrap();        try {            serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception{                            ChannelPipeline pipeline = ch.pipeline();                            pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));                            // 将请求和返回消息编码或解码成http                            pipeline.addLast("http-codec", new HttpServerCodec());                            // 使http的多个部分组合成一条完整的http                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));                            // 向客户端发送h5文件,主要是来支持websocket通信                            pipeline.addLast("http-chunked", new ChunkedWriteHandler());                            // 服务端自定义处理器                            pipeline.addLast("handler", new WebSocketServerHandler());                        }                    })                    // 开启心跳机制                    .childOption(ChannelOption.SO_KEEPALIVE, true)                    .handler(new ChannelInitializer<NioServerSocketChannel>() {                        protected void initChannel(NioServerSocketChannel ch) {                            System.out.println("WebSocket服务端启动中...");                        }                    });
            Channel ch = serverBootstrap.bind(HOST, PORT).sync().channel();            System.out.println("WebSocket host: "+ch.localAddress().toString().replace("/",""));            ch.closeFuture().sync();        }catch (Exception e){            e.printStackTrace();        }finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }
    }
    public static void main(String[] args) throws Exception {        startUp();    }}

上面启动类和HTTP协议的类似,所以较好理解。启动类启动后,我们需要处理WebSocket请求,这里自定义 WebSocketServerHandler。我们在处理中设计的业务逻辑有,如果只有一个连接来发送信息聊天,那么我们就以服务器自动回复,如果存在一个以上,我们就将信息发送给其他人。

package com.ytao.websocket;
import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.FullHttpRequest;import io.netty.handler.codec.http.websocketx.*;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.Date;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;
/** * Created by YANGTAO on 2019/11/17 0017. */public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    private WebSocketServerHandshaker handshaker;
    private static Map<String, ChannelHandlerContext> channelHandlerContextConcurrentHashMap = new ConcurrentHashMap<>();
    private static final Map<String, String> replyMap = new ConcurrentHashMap<>();    static {        replyMap.put("博客", "https://ytao.top");        replyMap.put("公众号", "ytao公众号");        replyMap.put("在吗", "在");        replyMap.put("吃饭了吗", "吃了");        replyMap.put("你好", "你好");        replyMap.put("谁", "ytao");        replyMap.put("几点", "现在本地时间:"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));    }
    @Override    public void messageReceived(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception{        channelHandlerContextConcurrentHashMap.put(channelHandlerContext.channel().toString(), channelHandlerContext);        // http        if (msg instanceof FullHttpRequest){            handleHttpRequest(channelHandlerContext, (FullHttpRequest) msg);        }else if (msg instanceof WebSocketFrame){ // WebSocket            handleWebSocketFrame(channelHandlerContext, (WebSocketFrame) msg);        }    }
    @Override    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception{        if (channelHandlerContextConcurrentHashMap.size() > 1){            for (String key : channelHandlerContextConcurrentHashMap.keySet()) {                ChannelHandlerContext current = channelHandlerContextConcurrentHashMap.get(key);                if (channelHandlerContext == current)                    continue;                current.flush();            }        }else {            // 单条处理            channelHandlerContext.flush();        }    }
    private void handleHttpRequest(ChannelHandlerContext channelHandlerContext, FullHttpRequest request) throws Exception{        // 验证解码是否异常        if (!"websocket".equals(request.headers().get("Upgrade")) || request.decoderResult().isFailure()){            // todo send response bad            System.err.println("解析http信息异常");            return;        }
        // 创建握手工厂类        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(          "ws:/".concat(channelHandlerContext.channel().localAddress().toString()),                null,                false        );        handshaker = factory.newHandshaker(request);
        if (handshaker == null)            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channelHandlerContext.channel());        else            // 响应握手消息给客户端            handshaker.handshake(channelHandlerContext.channel(), request);
    }
    private void handleWebSocketFrame(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame){        // 关闭链路        if (webSocketFrame instanceof CloseWebSocketFrame){            handshaker.close(channelHandlerContext.channel(), (CloseWebSocketFrame) webSocketFrame.retain());            return;        }
        // Ping消息        if (webSocketFrame instanceof PingWebSocketFrame){            channelHandlerContext.channel().write(              new PongWebSocketFrame(webSocketFrame.content().retain())            );            return;        }
        // Pong消息        if (webSocketFrame instanceof PongWebSocketFrame){            // todo Pong消息处理        }
        // 二进制消息        if (webSocketFrame instanceof BinaryWebSocketFrame){            // todo 二进制消息处理        }
        // 拆分数据        if (webSocketFrame instanceof ContinuationWebSocketFrame){            // todo 数据被拆分为多个websocketframe处理        }
        // 文本信息处理        if (webSocketFrame instanceof TextWebSocketFrame){            // 推送过来的消息            String  msg = ((TextWebSocketFrame) webSocketFrame).text();            System.out.println(String.format("%s 收到消息 : %s", new Date(), msg));
            String responseMsg = "";            if (channelHandlerContextConcurrentHashMap.size() > 1){                responseMsg = msg;                for (String key : channelHandlerContextConcurrentHashMap.keySet()) {                    ChannelHandlerContext current = channelHandlerContextConcurrentHashMap.get(key);                    if (channelHandlerContext == current)                        continue;                    Channel channel = current.channel();                    channel.write(                            new TextWebSocketFrame(responseMsg)                    );                }            }else {                // 自动回复                responseMsg = this.answer(msg);                if(responseMsg == null)                    responseMsg = "暂时无法回答你的问题 ->_->";                System.out.println("回复消息:"+responseMsg);                Channel channel = channelHandlerContext.channel();                channel.write(                        new TextWebSocketFrame("【服务端】" + responseMsg)                );            }        }
    }
    private String answer(String msg){        for (String key : replyMap.keySet()) {            if (msg.contains(key))                return replyMap.get(key);        }        return null;    }
    @Override    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable){        throwable.printStackTrace();        channelHandlerContext.close();    }
    @Override    public void close(ChannelHandlerContext channelHandlerContext, ChannelPromise promise) throws Exception {        channelHandlerContextConcurrentHashMap.remove(channelHandlerContext.channel().toString());        channelHandlerContext.close(promise);    }
}

刚建立连接时,第一次握手有HTTP协议处理,所以 WebSocketServerHandler#messageReceived会判断是HTTP还是WebSocket,如果是HTTP时,交由 WebSocketServerHandler#handleHttpRequest处理,里面会去验证请求,并且处理握手后将消息返回给客户端。如果不是HTTP协议,而是WebSocket协议时,处理交给 WebSocketServerHandler#handleWebSocketFrame处理,进入WebSocket处理后,这里面有判断消息属于哪种类型,里面包括 CloseWebSocketFramePingWebSocketFramePongWebSocketFrameBinaryWebSocketFrameContinuationWebSocketFrameTextWebSocketFrame,他们都是 WebSocketFrame的子类,并且 WebSocketFrame又继承自 DefaultByteBufHolder

channelHandlerContextConcurrentHashMap是缓存WebSocket已连接的信息,因为我们实现的需求要记录连接数量,当有连接关闭时我们要删除以缓存的连接,所以在 WebSocketServerHandler#close中要移除缓存。

最后的发送文本到客户端,根据连接数量判断。如果连接数量不大于1,那么,我们"价值一个亿的AI核心代码" WebSocketServerHandler#answer来回复客户端消息。否则除了本次接收的连接,消息会发送给其他所有连接的客户端。

客户端

客户端使用JS实现WebSocket的操作,目前主流的浏览器基本都支持WebSocket。支持情况如图:

客户端H5的代码实现:

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <meta name="viewport" content="width=device-width, initial-scale=1" />    <title>ytao-websocket</title>    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>    <style type="text/css">        #msgContent{            line-height:200%;            width: 500px;            height: 300px;            resize: none;            border-color: #FF9900;        }        .clean{            background-color: white;        }        .send{            border-radius: 10%;            background-color: #2BD56F;        }        @media screen and (max-width: 600px) {            #msgContent{                line-height:200%;                width: 100%;                height: 300px;            }        }    </style></head><script>    var socket;    var URL = "ws://127.0.0.1:8806/ytao";
    connect();
    function connect() {        $("#status").html("<span>连接中.....</span>");        window.WebSocket = !window.WebSocket == true? window.MozWebSocket : window.WebSocket;        if(window.WebSocket){            socket = new WebSocket(URL);            socket.onmessage = function(event){                var msg = event.data + "\n";                addMsgContent(msg);            };
            socket.onopen = function(){                $("#status").html("<span style='background-color: #44b549'>WebSocket已连接</span>");            };
            socket.onclose = function(){                $("#status").html("<span style='background-color: red'>WebSocket已断开连接</span>");                setTimeout("connect()", 3000);            };        }else{            $("#status").html("<span style='background-color: red'>该浏览器不支持WebSocket协议!</span>");        }    }
    function addMsgContent(msg) {        var contet = $("#msgContent").val() + msg;        $("#msgContent").val(contet)    }
    function clean() {        $("#msgContent").val("");    }
    function getUserName() {        var n = $("input[name=userName]").val();        if (n == "")            n = "匿名";        return n;    }
    function send(){        var message = $("input[name=message]").val();        if(!window.WebSocket) return;        if ($.trim(message) == ""){            alert("不能发送空消息!");            return;        }        if(socket.readyState == WebSocket.OPEN){            var msg = "【我】" + message + "\n";            this.addMsgContent(msg);            socket.send("【"+getUserName()+"】"+message);            $("input[name=message]").val("");        }else{            alert("无法建立WebSocket连接!");        }    }
    $(document).keyup(function(){        if(event.keyCode ==13){            send()        }    });</script><body>    <div style="text-align: center;">        <div id="status">            <span>连接中.....</span>        </div>        <div>            <h2>信息面板</h2>            <textarea id="msgContent" readonly="readonly"></textarea>        </div>        <div>            <input class="clean" type="button" value="清除聊天纪录" onclick="clean()" />            <input type="text" name="userName" value="" placeholder="用户名"/>        </div>        <hr>        <div>            <form onsubmit="return false">                <input type="text" name="message" value="" placeholder="请输入消息"/>                <input class="send" type="button" name="msgBtn" value="send" onclick="send()"/>            </form>        </div>        <div>            <br><br>            <img src="http://yangtao.ytao.top/ytao%E5%85%AC%E4%BC%97%E5%8F%B7.jpg">        </div>    </div></body></html>

JS这里实现相对较简单,主要用到:

  • newWebSocket(URL)创建WebSocket对象
  • onopen()打开连接
  • onclose()关闭连接
  • onmessage接收消息
  • send()发送消息

当断开连接后,客户端这边重新发起连接,直到连接成功为止。

启动

客户端和服务端连接后,我们从日志和请求中可以看到上面所提到的验证信息。

客户端:

服务端:

启动服务端后,先实验我们"价值一个亿的AI",只有一个连接用户时,发送信息结果如图:

多个用户连接,这里使用三个连接用户群聊。

用户一:

用户二:

用户三:

到目前为止,WebSocket已帮助我们实现即时通信的需求,相信大家也基本入门了WebSocket的基本使用。

总结

通过本文了解,可以帮助大家入门WebSocket并且解决当前可能存在的一些Web端的通信问题。我曾经在两个项目中也有看到该类解决方案都是通过定时轮询去做的,也或多或少对服务器资源造成一定的浪费。因为WebSocket本身是较复杂的,它提供的API也是比较多,所以在使用过程,要去真正使用好或去优化它,并不是一件很简单的事,也是需要根据现实场景针对性的去做。

本文分享自微信公众号 - Java研发军团(ityuancheng)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • SpringBoot整合WebSocket打造在线聊天室实战!!!

    1、WebSocket是HTML5开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。在WebSocket API中,浏览器和服务器只需要做一个握手的动作...

    用户5224393
  • 初学者第70节网络编程-Socket(一)

    java.net 包中 J2SE 的 API 包含有类和接口,它们提供低层次的通信细节。你可以直接使用这些类和接口,来专注于解决问题,而不用关注通信细节。

    用户5224393
  • 简单策略设计模式详解

    在策略模式中,一个类的行为或其算法可以在运行时更改。这种类型的设计模式属于行为型模式。

    用户5224393
  • WebSocket实现Web端即时通信

    HTTP协议是半双工协议,也就是说在同一时间点只能处理一个方向的数据传输,同时HTTP消息也是过于庞大,里面包含大量消息头数据,真正在消息处理中很多数据不是必须...

    ytao
  • 字符串的学习

    1> “==”与“equals”的区别 “==”判断的是两个字符串对象在内存中的首地址,就是判断是否是同一个字符串对象; 而equals()判断的是两个字符串对...

    片刻
  • 基于云计算的 CV 移动交互应用研究(1):CV交互+云计算

    Google Translate App 以word Lens即时相机翻译黑科技与 基于云计算架构的“统计机器翻译”的强大服务后台,引爆大众关注。“CV交互+移...

    flavorfan
  • APK安装流程详解2——PackageManager简介

    俗话说的好,得中原者,得天下,那么想要了解Android的安装了流程就不得不提及一个重要的类"PackageManager"我们就先来了解这两个类

    隔壁老李头
  • winfrom 树形控件如何实现鼠标经过节点时光标颜色改变效果

    跟着阿笨一起玩NET
  • libgconf-2.so.4安装

    似水的流年
  • 远程办公经验为0,如何将日常工作平滑过度到线上?

    导语 | 受到疫情影响,很多企业开始考虑远程办公。近日,TVP群里的各位老师们对此话题展开了热烈讨论。TVP张善友老师作为一名创业者,也决定开启远程办公。本文是...

    尾尾

扫码关注云+社区

领取腾讯云代金券