前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >WebSocket协议-源码分析

WebSocket协议-源码分析

作者头像
数据小冰
发布2024-07-04 16:00:28
390
发布2024-07-04 16:00:28
举报
文章被收录于专栏:数据小冰数据小冰

本文是WebSocket系列文章的第3篇,从源码角度理解WebSocket是如何实现的。分析的是gorilla websocket,即WebSocket协议-实战中服务端使用的WebSocket库。

Gorilla WebSocket是一个由Go语言实现的,经过很好测试并且广泛使用的WebSocket库,它提供了简单易用、功能强大的API接口。目前在github上已有2万+⭐️。

Part1前言

Gorilla WebSocket既然是一个库,对于使用者来说,就是调用该库的API接口完成业务功能。库封装了内部功能,屏蔽底层实现,使用者无需关心内部实现逻辑。而本文是源码分析,就是要分析提供API接口的内部实现。所以我们就从API入手,抽丝剥茧了解实现细节。

WebSocket作为一个应用层协议,与HTTP协议处于同一层级。Gorilla WebSocket库层次结构如下,位于底层TCP协议和业务模块之间。

可以看到 Gorilla WebSocket处于承上启下的关键位置。我们的业务模块数据是如何写入到下层的TCP;对端发送的TCP数据又是如何解析为业务层数据;初始阶段HTTP连接是如何提升为WebSocket。这依次对应着 Gorilla WebSocket 中的 Upgrade、Read和Write关键接口,在下面的章节依次详细分析。

Part2 Upgrade实现

WebSocket协议-概念原理中提到WebSocket握手阶段采用的是HTTP协议,该过程对应到Gorilla WebSocket库中就是 Upgrade 接口。

1原理

握手采用HTTP协议,那Upgrade接口位于HTTP层之上,此时可以将Upgrade理解为HTTP的handler函数,一个特殊的业务处理函数。

标准的HTTP handler处理函数接受一个 http.ResponseWriter 和 *http.Request参数,Upgrade既然是一个特殊的handler,那么它的入参也必有一个 http.ResponseWriter 和 *http.Request。事实确实如此,Upgrade的签名为 func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error)

Upgrade会返回一个 *Conn,这是一个WebSocket连接句柄。所以通过调用Upgrade方法,成功建立起WebSocket连接。

2静态结构

3处理流程

4关键代码分析

参数校验工具函数

tokenListContainsValue 是一个参数校验工具函数,在util.go文件中,检查 name 和 value值是否在Header中。

代码语言:javascript
复制
if !tokenListContainsValue(r.Header, "Connection", "upgrade") {
 ...
}
  
if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
 ...
}
    
if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {
 ...
}

现在来看 tokenListContainsValue 实现细节,入参 header本质上是一个 map[string][]string类型,所以根据传入的name从header中取出对应的value,value是一个[]string,所以依次判断每个string是否有给定的value值。由于每个value支持逗号分隔,所以调用 nextToken提取每个token。例如,如果从header获取到的value是"other,websocket,more", tokenListContainsValue(r.Header, "Upgrade", "websocket")会返回 true。

代码语言:javascript
复制
func tokenListContainsValue(header http.Header, name string, value string) bool {
headers:
 for _, s := range header[name] {
  for {
   var t string
   t, s = nextToken(skipSpace(s))
   if t == "" {
    continue headers
   }
   s = skipSpace(s)
   if s != "" && s[0] != ',' {
    continue headers
   }
   if equalASCIIFold(t, value) {
    return true
   }
   if s == "" {
    continue headers
   }
   s = s[1:]
  }
 }
 return false
}

NOTE:📢注意上述代码使用 continue label,我们可能使用的比较少,特别是在其他语言中很少有类似的语法。其实在Go语言中,break label 和 continue label是两个非常有趣的语法。在我之前的文章 Go语言中常见100问题-#34 Ignoring how the break statement works有专门讲解break label。

continue label实现的效果是跳过 for _, s := range header[name] 当前的循环,继续剩余循环处理。因为 header[name]值是一个string切片,所以代码中 continue headers 效果是跳过当前s, 继续执行下一个字符串处理。

采用continue headers 本质是跳过内存的for循环,继续外层的for循环。但是我个人认为作者这里写复杂了,采用break也可以实现相同的效果(即将 continue headers 替换为 break),并且代码更简洁。

Hijack获取原始TCP连接

通过Upgrade操作将HTTP协议升级为WebSocket协议,一个重要的目标是获取底层TCP的控制权,拿到原始的TCP连接句柄,后续处理不在经过HTTP库,直接由WebSocket接管。

具体获取代码如下,ResponseWriter 实现了 Hijacker 接口,通过它调用 h.Hijack()方法即获得原始的netConn。默认的 ResponseWriter 对于 HTTP/1.x 连接支持 Hijacker 接口, 而HTTP2不支持,所以进行 w.(http.Hijacker) 断言处理。

代码语言:javascript
复制
h, ok := w.(http.Hijacker)
 if !ok {
  return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")
 }
 var brw *bufio.ReadWriter
 netConn, brw, err := h.Hijack()
 if err != nil {
  return u.returnError(w, r, http.StatusInternalServerError, err.Error())
 }

Part3 Read实现

Read即从TCP连接中读取数据,然后将读到的二进制数据解析为WebSocket协议帧结构,简单来说就是数据序列化。

1原理

根据如下WebSocket帧结构,将收到的二进制数据解析出来。

代码语言:javascript
复制
  0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-------+-+-------------+-------------------------------+
 |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
 |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
 |N|V|V|V|       |S|             |   (if payload len==126/127)   |
 | |1|2|3|       |K|             |                               |
 +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
 |     Extended payload length continued, if payload len == 127  |
 + - - - - - - - - - - - - - - - +-------------------------------+
 |                               |Masking-key, if MASK set to 1  |
 +-------------------------------+-------------------------------+
 | Masking-key (continued)       |          Payload Data         |
 +-------------------------------- - - - - - - - - - - - - - - - +
 :                     Payload Data continued ...                :
 + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
 |                     Payload Data continued ...                |
 +---------------------------------------------------------------+

2处理流程

3关键代码分析

读取操作核心处理都在 NextReader 函数中,该函数返回WebSocket消息数据类型以及data读取器。

关键操作在如下的for循环中,也就是说在读取数据出现异常,或者读取到的是文本数据或二进制数据才会返回。其他像读取到的是控制消息都不会返回,继续卡住。因为调用方是业务模块,所以只关心读取到业务数据才会处理,如果没有读取阻塞在NextReader上完全合情合理。

代码语言:javascript
复制
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
 ...

 for c.readErr == nil {
  frameType, err := c.advanceFrame()
  if err != nil {
   c.readErr = hideTempErr(err)
   break
  }

  if frameType == TextMessage || frameType == BinaryMessage {
   c.messageReader = &messageReader{c}
   c.reader = c.messageReader
   if c.readDecompress {
    c.reader = c.newDecompressionReader(c.reader)
   }
   return frameType, c.reader, nil
  }
 }
    ...

 return noFrame, nil, c.readErr
}

真正读取操作是在 advanceFrame 方法中。调用 c.read(2)读取前两个字节。将第一个字节即 p[0] 与 0xf 取与操作,获取到低4个bit值,对应到原理小节中的 opcode字段。

将p[1]即第二字节与 0x7f进行取与操作,得到其低7个bit,即对应原理图中的Payload len。如果值为126,读取接下的2个字节,即为实际data大小,如果值为127,则读取接下来的8个字节。

代码语言:javascript
复制
func (c *Conn) advanceFrame() (int, error) {
 ...
 var errors []string

 p, err := c.read(2)
 if err != nil {
  return noFrame, err
 }

 frameType := int(p[0] & 0xf)
 final := p[0]&finalBit != 0
 rsv1 := p[0]&rsv1Bit != 0
 rsv2 := p[0]&rsv2Bit != 0
 rsv3 := p[0]&rsv3Bit != 0
 mask := p[1]&maskBit != 0
 c.setReadRemaining(int64(p[1] & 0x7f))
    ...
 switch c.readRemaining {
 case 126:
  p, err := c.read(2)
  if err != nil {
   return noFrame, err
  }

  if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
   return noFrame, err
  }
 case 127:
  p, err := c.read(8)
  if err != nil {
   return noFrame, err
  }

  if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
   return noFrame, err
  }
 }

 ...
}

Part4 Write实现

Write即向TCP连接中写入数据,然后将业务模块发送的WebSocket帧数据翻译为二进制数据,简单来说就是数据反序列化,跟Read是逆向操作。

1原理

根据WebSocket帧结构,将业务层数据封装成二进制数据。

2处理流程

3关键代码分析

Write核心操作在于封包,下面结合代码分析具体实现细节。封包处理在 flushFrame 方法中实现。

构造WebSocket的第一个字节b0和第二个字节b1。c.writeBuf承载整个WebSocket报文数据。

代码语言:javascript
复制
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
 ...

 b0 := byte(w.frameType)
 if final {
  b0 |= finalBit
 }
 if w.compress {
  b0 |= rsv1Bit
 }
 w.compress = false

 b1 := byte(0)
 if !c.isServer {
  b1 |= maskBit
 }

 ...
}

WriteBuf结构如下,开头预留了 2+8+4 个字节,这个是按header最大长度预留的。maxFrameHeaderSize后面才是实际的数据。

统一预留最大14个字节的header是方便代码处理,对于server端,是不用填充 Masking-key数据的,所以代码中直接将 framePos 设置为4,真正有效数据是 c.WriteBuf[4:],而对于client端,需要填充 Masking-key , 所以它的 framePos就是0。

代码语言:javascript
复制
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
 ...
 // Assume that the frame starts at beginning of c.writeBuf.
 framePos := 0
 if c.isServer {
  // Adjust up if mask not included in the header.
  framePos = 4
 }
    ...
}

填充数据长度大小到 payload length。这一部分大小是不固定的,如果data长度大于等于65536,则表示长度这一部分占8个字节。

代码语言:javascript
复制
case length >= 65536:
  c.writeBuf[framePos] = b0
  c.writeBuf[framePos+1] = b1 | 127
  binary.BigEndian.PutUint64(c.writeBuf[framePos+2:], uint64(length))

如果data长度小于65536但大于125,则表示长度这一部分占2个字节。

代码语言:javascript
复制
case length > 125:
		framePos += 6
		c.writeBuf[framePos] = b0
		c.writeBuf[framePos+1] = b1 | 126
		binary.BigEndian.PutUint16(c.writeBuf[framePos+2:], uint16(length))

如果data长度小于等于125,这不用额外空间表示长度,payload len大小就是。

代码语言:javascript
复制
default:
  framePos += 8
  c.writeBuf[framePos] = b0
  c.writeBuf[framePos+1] = b1 | byte(length)

Part5 并发性分析

关于读写的并发性,doc.go文件中有如下说明。核心就是不支持多并发读、多并发写。一个并发读和一个并发写是支持的。

Connections support one concurrent reader and one concurrent writer.Applications are responsible for ensuring that no more than one goroutine calls the write methods (NextWriter, SetWriteDeadline, WriteMessage,WriteJSON, EnableWriteCompression, SetCompressionLevel) concurrently and that no more than one goroutine calls the read methods (NextReader,SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler)concurrently.

像下面这样开启一个读 goroutine 和 一个写 goroutine是✅正确的。

代码语言:javascript
复制
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
    log.Println(err)
    return
}
client := NewClient(conn, m)
m.addClient(client)
go client.readMessages()
go client.writeMessages()

下面同时开启两个写goroutine是❎错误的。

代码语言:javascript
复制
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
    log.Println(err)
    return
}
client := NewClient(conn, m)
m.addClient(client)
go client.writeMessages()
go client.writeMessages()

下面来看写操作为啥不支持并发,代码注释强调"best-effort”意味着系统会尽力去执行这项检测,但可能不保证在所有情况下都能检测到并发写入。

Write the buffers to the connection with best-effort detection of concurrent writes.

在进行写入前,先判断c.isWriting是否true,如果已经为true,表明有写操作正在进行,直接panic。

如果没有人写入,先将c.isWriting设置为true,然后发送数据,发送完毕之后又将c.isWriting置为false。

代码语言:javascript
复制
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
 ...
 // Write the buffers to the connection with best-effort detection of
 // concurrent writes. See the concurrency section in the package
 // documentation for more info.

 if c.isWriting {
  panic("concurrent write to websocket connection")
 }
 c.isWriting = true
 // 向tcp socket中写入数据
 err := c.write(w.frameType, c.writeDeadline, c.writeBuf[framePos:w.pos], extra)

 if !c.isWriting {
  panic("concurrent write to websocket connection")
 }
 c.isWriting = false

 ...
}

写操作write通过channel进行排他,要从channel获取到数据后才能进行。通过defer保证写之后向通道发送数据,为下一次数据写入做好准备。

代码语言:javascript
复制
func (c *Conn) write(frameType int, deadline time.Time, buf0, buf1 []byte) error {
 <-c.mu
 defer func() { c.mu <- struct{}{} }()

 ...
 return nil
}

可以看到,上述write操作通过channel保证顺序执行,保证写入安全。真正不能并发的原因,是在外层函数,即下面的代码只是在尽可能检查是否并发,但是还是有可能两个goroutine逻辑同时走到 if c.isWriting,并且c.isWriting为false,都会继续后面的逻辑,引发第二次 if !c.isWriting判断时出现panic。

代码语言:javascript
复制
if c.isWriting {
    panic("concurrent write to websocket connection")
}
c.isWriting = true
err := c.write(w.frameType, c.writeDeadline, c.writeBuf[framePos:w.pos], extra)

if !c.isWriting {
    panic("concurrent write to websocket connection")
}
c.isWriting = false
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据小冰 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Part1前言
  • Part2 Upgrade实现
    • 1原理
      • 2静态结构
        • 3处理流程
          • 4关键代码分析
            • 参数校验工具函数
            • Hijack获取原始TCP连接
        • Part3 Read实现
          • 1原理
            • 2处理流程
              • 3关键代码分析
              • Part4 Write实现
                • 1原理
                  • 2处理流程
                    • 3关键代码分析
                    • Part5 并发性分析
                    相关产品与服务
                    腾讯云代码分析
                    腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档