本文是WebSocket系列文章的第3篇,从源码角度理解WebSocket是如何实现的。分析的是gorilla websocket,即WebSocket协议-实战中服务端使用的WebSocket库。
Gorilla WebSocket是一个由Go语言实现的,经过很好测试并且广泛使用的WebSocket库,它提供了简单易用、功能强大的API接口。目前在github上已有2万+⭐️。
Gorilla WebSocket既然是一个库,对于使用者来说,就是调用该库的API接口完成业务功能。库封装了内部功能,屏蔽底层实现,使用者无需关心内部实现逻辑。而本文是源码分析,就是要分析提供API接口的内部实现。所以我们就从API入手,抽丝剥茧了解实现细节。
WebSocket作为一个应用层协议,与HTTP协议处于同一层级。Gorilla WebSocket库层次结构如下,位于底层TCP协议和业务模块之间。
可以看到 Gorilla WebSocket处于承上启下的关键位置。我们的业务模块数据是如何写入到下层的TCP;对端发送的TCP数据又是如何解析为业务层数据;初始阶段HTTP连接是如何提升为WebSocket。这依次对应着 Gorilla WebSocket 中的 Upgrade、Read和Write关键接口,在下面的章节依次详细分析。
在WebSocket协议-概念原理中提到WebSocket握手阶段采用的是HTTP协议,该过程对应到Gorilla WebSocket库中就是 Upgrade 接口。
握手采用HTTP协议,那Upgrade接口位于HTTP层之上,此时可以将Upgrade理解为HTTP的handler函数,一个特殊的业务处理函数。
标准的HTTP handler处理函数接受一个 http.ResponseWriter 和 *http.Request参数,Upgrade既然是一个特殊的handler,那么它的入参也必有一个 http.ResponseWriter 和 *http.Request。事实确实如此,Upgrade的签名为 func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error)
。
Upgrade会返回一个 *Conn
,这是一个WebSocket连接句柄。所以通过调用Upgrade方法,成功建立起WebSocket连接。
tokenListContainsValue 是一个参数校验工具函数,在util.go文件中,检查 name 和 value值是否在Header中。
if !tokenListContainsValue(r.Header, "Connection", "upgrade") {
...
}
if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
...
}
if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {
...
}
现在来看 tokenListContainsValue 实现细节,入参 header本质上是一个 map[string][]string
类型,所以根据传入的name从header中取出对应的value,value是一个[]string,所以依次判断每个string是否有给定的value值。由于每个value支持逗号分隔,所以调用 nextToken
提取每个token。例如,如果从header获取到的value是"other,websocket,more", tokenListContainsValue(r.Header, "Upgrade", "websocket")
会返回 true。
func tokenListContainsValue(header http.Header, name string, value string) bool {
headers:
for _, s := range header[name] {
for {
var t string
t, s = nextToken(skipSpace(s))
if t == "" {
continue headers
}
s = skipSpace(s)
if s != "" && s[0] != ',' {
continue headers
}
if equalASCIIFold(t, value) {
return true
}
if s == "" {
continue headers
}
s = s[1:]
}
}
return false
}
NOTE:📢注意上述代码使用 continue label,我们可能使用的比较少,特别是在其他语言中很少有类似的语法。其实在Go语言中,break label 和 continue label是两个非常有趣的语法。在我之前的文章 Go语言中常见100问题-#34 Ignoring how the break statement works有专门讲解break label。
continue label实现的效果是跳过 for _, s := range header[name]
当前的循环,继续剩余循环处理。因为 header[name]值是一个string切片,所以代码中 continue headers 效果是跳过当前s, 继续执行下一个字符串处理。
采用continue headers 本质是跳过内存的for循环,继续外层的for循环。但是我个人认为作者这里写复杂了,采用break也可以实现相同的效果(即将 continue headers 替换为 break),并且代码更简洁。
通过Upgrade操作将HTTP协议升级为WebSocket协议,一个重要的目标是获取底层TCP的控制权,拿到原始的TCP连接句柄,后续处理不在经过HTTP库,直接由WebSocket接管。
具体获取代码如下,ResponseWriter 实现了 Hijacker 接口,通过它调用 h.Hijack()方法即获得原始的netConn。默认的 ResponseWriter 对于 HTTP/1.x 连接支持 Hijacker 接口, 而HTTP2不支持,所以进行 w.(http.Hijacker) 断言处理。
h, ok := w.(http.Hijacker)
if !ok {
return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")
}
var brw *bufio.ReadWriter
netConn, brw, err := h.Hijack()
if err != nil {
return u.returnError(w, r, http.StatusInternalServerError, err.Error())
}
Read即从TCP连接中读取数据,然后将读到的二进制数据解析为WebSocket协议帧结构,简单来说就是数据序列化。
根据如下WebSocket帧结构,将收到的二进制数据解析出来。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
读取操作核心处理都在 NextReader 函数中,该函数返回WebSocket消息数据类型以及data读取器。
关键操作在如下的for循环中,也就是说在读取数据出现异常,或者读取到的是文本数据或二进制数据才会返回。其他像读取到的是控制消息都不会返回,继续卡住。因为调用方是业务模块,所以只关心读取到业务数据才会处理,如果没有读取阻塞在NextReader上完全合情合理。
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
...
for c.readErr == nil {
frameType, err := c.advanceFrame()
if err != nil {
c.readErr = hideTempErr(err)
break
}
if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
return frameType, c.reader, nil
}
}
...
return noFrame, nil, c.readErr
}
真正读取操作是在 advanceFrame 方法中。调用 c.read(2)读取前两个字节。将第一个字节即 p[0] 与 0xf 取与操作,获取到低4个bit值,对应到原理小节中的 opcode字段。
将p[1]即第二字节与 0x7f进行取与操作,得到其低7个bit,即对应原理图中的Payload len。如果值为126,读取接下的2个字节,即为实际data大小,如果值为127,则读取接下来的8个字节。
func (c *Conn) advanceFrame() (int, error) {
...
var errors []string
p, err := c.read(2)
if err != nil {
return noFrame, err
}
frameType := int(p[0] & 0xf)
final := p[0]&finalBit != 0
rsv1 := p[0]&rsv1Bit != 0
rsv2 := p[0]&rsv2Bit != 0
rsv3 := p[0]&rsv3Bit != 0
mask := p[1]&maskBit != 0
c.setReadRemaining(int64(p[1] & 0x7f))
...
switch c.readRemaining {
case 126:
p, err := c.read(2)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
return noFrame, err
}
case 127:
p, err := c.read(8)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
return noFrame, err
}
}
...
}
Write即向TCP连接中写入数据,然后将业务模块发送的WebSocket帧数据翻译为二进制数据,简单来说就是数据反序列化,跟Read是逆向操作。
根据WebSocket帧结构,将业务层数据封装成二进制数据。
Write核心操作在于封包,下面结合代码分析具体实现细节。封包处理在 flushFrame 方法中实现。
构造WebSocket的第一个字节b0和第二个字节b1。c.writeBuf承载整个WebSocket报文数据。
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
...
b0 := byte(w.frameType)
if final {
b0 |= finalBit
}
if w.compress {
b0 |= rsv1Bit
}
w.compress = false
b1 := byte(0)
if !c.isServer {
b1 |= maskBit
}
...
}
WriteBuf结构如下,开头预留了 2+8+4 个字节,这个是按header最大长度预留的。maxFrameHeaderSize后面才是实际的数据。
统一预留最大14个字节的header是方便代码处理,对于server端,是不用填充 Masking-key数据的,所以代码中直接将 framePos 设置为4,真正有效数据是 c.WriteBuf[4:],而对于client端,需要填充 Masking-key , 所以它的 framePos就是0。
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
...
// Assume that the frame starts at beginning of c.writeBuf.
framePos := 0
if c.isServer {
// Adjust up if mask not included in the header.
framePos = 4
}
...
}
填充数据长度大小到 payload length。这一部分大小是不固定的,如果data长度大于等于65536,则表示长度这一部分占8个字节。
case length >= 65536:
c.writeBuf[framePos] = b0
c.writeBuf[framePos+1] = b1 | 127
binary.BigEndian.PutUint64(c.writeBuf[framePos+2:], uint64(length))
如果data长度小于65536但大于125,则表示长度这一部分占2个字节。
case length > 125:
framePos += 6
c.writeBuf[framePos] = b0
c.writeBuf[framePos+1] = b1 | 126
binary.BigEndian.PutUint16(c.writeBuf[framePos+2:], uint16(length))
如果data长度小于等于125,这不用额外空间表示长度,payload len大小就是。
default:
framePos += 8
c.writeBuf[framePos] = b0
c.writeBuf[framePos+1] = b1 | byte(length)
关于读写的并发性,doc.go文件中有如下说明。核心就是不支持多并发读、多并发写。一个并发读和一个并发写是支持的。
Connections support one concurrent reader and one concurrent writer.Applications are responsible for ensuring that no more than one goroutine calls the write methods (NextWriter, SetWriteDeadline, WriteMessage,WriteJSON, EnableWriteCompression, SetCompressionLevel) concurrently and that no more than one goroutine calls the read methods (NextReader,SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler)concurrently.
像下面这样开启一个读 goroutine 和 一个写 goroutine是✅正确的。
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(conn, m)
m.addClient(client)
go client.readMessages()
go client.writeMessages()
下面同时开启两个写goroutine是❎错误的。
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(conn, m)
m.addClient(client)
go client.writeMessages()
go client.writeMessages()
下面来看写操作为啥不支持并发,代码注释强调"best-effort”意味着系统会尽力去执行这项检测,但可能不保证在所有情况下都能检测到并发写入。
Write the buffers to the connection with best-effort detection of concurrent writes.
在进行写入前,先判断c.isWriting是否true,如果已经为true,表明有写操作正在进行,直接panic。
如果没有人写入,先将c.isWriting设置为true,然后发送数据,发送完毕之后又将c.isWriting置为false。
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
...
// Write the buffers to the connection with best-effort detection of
// concurrent writes. See the concurrency section in the package
// documentation for more info.
if c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = true
// 向tcp socket中写入数据
err := c.write(w.frameType, c.writeDeadline, c.writeBuf[framePos:w.pos], extra)
if !c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = false
...
}
写操作write通过channel进行排他,要从channel获取到数据后才能进行。通过defer保证写之后向通道发送数据,为下一次数据写入做好准备。
func (c *Conn) write(frameType int, deadline time.Time, buf0, buf1 []byte) error {
<-c.mu
defer func() { c.mu <- struct{}{} }()
...
return nil
}
可以看到,上述write操作通过channel保证顺序执行,保证写入安全。真正不能并发的原因,是在外层函数,即下面的代码只是在尽可能检查是否并发,但是还是有可能两个goroutine逻辑同时走到 if c.isWriting
,并且c.isWriting为false,都会继续后面的逻辑,引发第二次 if !c.isWriting
判断时出现panic。
if c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = true
err := c.write(w.frameType, c.writeDeadline, c.writeBuf[framePos:w.pos], extra)
if !c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = false