KCP-GO源码解析

原文作者:张伯雨 golang技术社区

概念

ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一. RTO:Retransmission TimeOut FEC:Forward Error Correction

kcp简介

kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcp

kcp-go是用go实现了kcp协议的一个库,其实kcp类似tcp,协议的实现也很多参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。 kcp和tcp一样,也分客户端和监听端。

1+-+-+-+-+-+            +-+-+-+-+-+
2    |  Client |            |  Server |
3    +-+-+-+-+-+            +-+-+-+-+-+
4        |------ kcp data ------>|     
5        |<----- kcp data -------|    

kcp协议

layer model

 1+----------------------+
 2|      Session         |
 3+----------------------+
 4|      KCP(ARQ)        |
 5+----------------------+
 6|      FEC(OPTIONAL)   |
 7+----------------------+
 8|      CRYPTO(OPTIONAL)|
 9+----------------------+
10|      UDP(Packet)     |
11+----------------------+ 

KCP header

KCP Header Format

 1    4           1   1     2 (Byte)
 2 +---+---+---+---+---+---+---+---+
 3 |     conv      |cmd|frg|  wnd  |
 4 +---+---+---+---+---+---+---+---+
 5 |     ts        |     sn        |
 6 +---+---+---+---+---+---+---+---+
 7 |     una       |     len       |
 8 +---+---+---+---+---+---+---+---+
 9 |                               |
10 +             DATA              +
11 |                               |
12 +---+---+---+---+---+---+---+---+
13

代码结构

 1src/vendor/github.com/xtaci/kcp-go/
 2├── LICENSE
 3├── README.md
 4├── crypt.go    加解密实现
 5├── crypt_test.go
 6├── donate.png
 7├── fec.go      向前纠错实现
 8├── frame.png
 9├── kcp-go.png
10├── kcp.go      kcp协议实现
11├── kcp_test.go
12├── sess.go     会话管理实现
13├── sess_test.go
14├── snmp.go     数据统计实现
15├── updater.go  任务调度实现
16├── xor.go      xor封装
17└── xor_test.go

着重研究两个文件kcp.gosess.go

kcp浅析

kcp是基于udp实现的,所有udp的实现这里不做介绍,kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。

kcp client整体函数流

和tcp一样,kcp要连接服务端需要先拨号,但是和tcp有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而tcp在这里需要三次握手。

 1DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
 2    V
 3net.DialUDP("udp", nil, udpaddr)
 4    V
 5NewConn()
 6    V
 7newUDPSession() {初始化UDPSession}
 8    V
 9NewKCP() {初始化kcp}
10    V
11updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
12    V
13go sess.readLoop()
14    V
15go s.receiver(chPacket)
16    V
17s.kcpInput(data)
18    V
19s.fecDecoder.decodeBytes(data)
20    V
21s.kcp.Input(data, true, s.ackNoDelay)
22    V
23kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
24    V
25notifyReadEvent()
26

客户端大体的流程如上面所示,先Dial,建立udp连接,将这个连接封装成一个会话,然后启动一个go程,接收udp的消息。

kcp server整体函数流

 1ListenWithOptions() 
 2    V
 3net.ListenUDP()
 4    V
 5ServerConn()
 6    V
 7newFECDecoder()
 8    V
 9go l.monitor() {从chPacket接收udp数据,写入kcp}
10    V
11go l.receiver(chPacket) {从upd接收数据,并入队列}
12    V
13newUDPSession()
14    V
15updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
16    V
17s.kcpInput(data)`
18    V
19s.fecDecoder.decodeBytes(data)
20    V
21s.kcp.Input(data, true, s.ackNoDelay)
22    V
23kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
24    V
25notifyReadEvent()

服务端的大体流程如上图所示,先Listen,启动udp监听,接着用一个go程监控udp的数据包,负责将不同session的数据写入不同的udp连接,然后解析封装将数据交给上层。

kcp 数据流详细解析

不管是kcp的客户端还是服务端,他们都有io行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。

kcp client 发送消息

 1s.Write(b []byte) 
 2    V
 3s.kcp.WaitSnd() {}
 4    V
 5s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
 6     V
 7s.kcp.flush(false) [flush data to output] {
 8    if writeDelay==true {
 9        flush
10    }else{
11        每隔`interval`时间flush一次
12    }
13}
14     V
15kcp.output(buffer, size) 
16     V
17s.output(buf)
18     V
19s.conn.WriteTo(ext, s.remote)
20     V
21s.conn..Conn.WriteTo(buf)

读写都是在sess.go文件中实现的,Write方法:

 1// Write implements net.Conn
 2func (s *UDPSession) Write(b []byte) (n int, err error) {
 3    for {
 4        ...
 5        // api flow control
 6        if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
 7            n = len(b)
 8            for {
 9                if len(b) <= int(s.kcp.mss) {
10                    s.kcp.Send(b)
11                    break
12                } else {
13                    s.kcp.Send(b[:s.kcp.mss])
14                    b = b[s.kcp.mss:]
15                }
16            }
17            if !s.writeDelay {
18                s.kcp.flush(false)
19            }
20            s.mu.Unlock()
21            atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
22            return n, nil
23        }
24        ...
25        // wait for write event or timeout
26        select {
27        case <-s.chWriteEvent:
28        case <-c:
29        case <-s.die:
30        }
31        if timeout != nil {
32            timeout.Stop()
33        }
34    }
35}

假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send(“hello”),而Send函数实现根据mss的值对数据分段,当然这里的发送的hello,长度太短,只分了一个段,并把它们插入发送的队列里。

 1func (kcp *KCP) Send(buffer []byte) int {
 2    ...
 3    for i := 0; i < count; i++ {
 4        var size int
 5        if len(buffer) > int(kcp.mss) {
 6            size = int(kcp.mss)
 7        } else {
 8            size = len(buffer)
 9        }
10        seg := kcp.newSegment(size)
11        copy(seg.data, buffer[:size])
12        if kcp.stream == 0 { // message mode
13            seg.frg = uint8(count - i - 1)
14        } else { // stream mode
15            seg.frg = 0
16        }
17        kcp.snd_queue = append(kcp.snd_queue, seg)
18        buffer = buffer[size:]
19    }
20    return 0
21}

接着判断参数writeDelay,如果参数设置为false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由flush函数实现的。

  1// flush pending data
  2func (kcp *KCP) flush(ackOnly bool) {
  3    var seg Segment
  4    seg.conv = kcp.conv
  5    seg.cmd = IKCP_CMD_ACK
  6    seg.wnd = kcp.wnd_unused()
  7    seg.una = kcp.rcv_nxt
  8    buffer := kcp.buffer
  9    // flush acknowledges
 10    ptr := buffer
 11    for i, ack := range kcp.acklist {
 12        size := len(buffer) - len(ptr)
 13        if size+IKCP_OVERHEAD > int(kcp.mtu) {
 14            kcp.output(buffer, size)
 15            ptr = buffer
 16        }
 17        // filter jitters caused by bufferbloat
 18        if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
 19            seg.sn, seg.ts = ack.sn, ack.ts
 20            ptr = seg.encode(ptr)
 21        }
 22    }
 23    kcp.acklist = kcp.acklist[0:0]
 24    if ackOnly { // flash remain ack segments
 25        size := len(buffer) - len(ptr)
 26        if size > 0 {
 27            kcp.output(buffer, size)
 28        }
 29        return
 30    }
 31    // probe window size (if remote window size equals zero)
 32    if kcp.rmt_wnd == 0 {
 33        current := currentMs()
 34        if kcp.probe_wait == 0 {
 35            kcp.probe_wait = IKCP_PROBE_INIT
 36            kcp.ts_probe = current + kcp.probe_wait
 37        } else {
 38            if _itimediff(current, kcp.ts_probe) >= 0 {
 39                if kcp.probe_wait < IKCP_PROBE_INIT {
 40                    kcp.probe_wait = IKCP_PROBE_INIT
 41                }
 42                kcp.probe_wait += kcp.probe_wait / 2
 43                if kcp.probe_wait > IKCP_PROBE_LIMIT {
 44                    kcp.probe_wait = IKCP_PROBE_LIMIT
 45                }
 46                kcp.ts_probe = current + kcp.probe_wait
 47                kcp.probe |= IKCP_ASK_SEND
 48            }
 49        }
 50    } else {
 51        kcp.ts_probe = 0
 52        kcp.probe_wait = 0
 53    }
 54    // flush window probing commands
 55    if (kcp.probe & IKCP_ASK_SEND) != 0 {
 56        seg.cmd = IKCP_CMD_WASK
 57        size := len(buffer) - len(ptr)
 58        if size+IKCP_OVERHEAD > int(kcp.mtu) {
 59            kcp.output(buffer, size)
 60            ptr = buffer
 61        }
 62        ptr = seg.encode(ptr)
 63    }
 64    // flush window probing commands
 65    if (kcp.probe & IKCP_ASK_TELL) != 0 {
 66        seg.cmd = IKCP_CMD_WINS
 67        size := len(buffer) - len(ptr)
 68        if size+IKCP_OVERHEAD > int(kcp.mtu) {
 69            kcp.output(buffer, size)
 70            ptr = buffer
 71        }
 72        ptr = seg.encode(ptr)
 73    }
 74    kcp.probe = 0
 75    // calculate window size
 76    cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
 77    if kcp.nocwnd == 0 {
 78        cwnd = _imin_(kcp.cwnd, cwnd)
 79    }
 80    // sliding window, controlled by snd_nxt && sna_una+cwnd
 81    newSegsCount := 0
 82    for k := range kcp.snd_queue {
 83        if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
 84            break
 85        }
 86        newseg := kcp.snd_queue[k]
 87        newseg.conv = kcp.conv
 88        newseg.cmd = IKCP_CMD_PUSH
 89        newseg.sn = kcp.snd_nxt
 90        kcp.snd_buf = append(kcp.snd_buf, newseg)
 91        kcp.snd_nxt++
 92        newSegsCount++
 93        kcp.snd_queue[k].data = nil
 94    }
 95    if newSegsCount > 0 {
 96        kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
 97    }
 98    // calculate resent
 99    resent := uint32(kcp.fastresend)
100    if kcp.fastresend <= 0 {
101        resent = 0xffffffff
102    }
103    // check for retransmissions
104    current := currentMs()
105    var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
106    for k := range kcp.snd_buf {
107        segment := &kcp.snd_buf[k]
108        needsend := false
109        if segment.xmit == 0 { // initial transmit
110            needsend = true
111            segment.rto = kcp.rx_rto
112            segment.resendts = current + segment.rto
113        } else if _itimediff(current, segment.resendts) >= 0 { // RTO
114            needsend = true
115            if kcp.nodelay == 0 {
116                segment.rto += kcp.rx_rto
117            } else {
118                segment.rto += kcp.rx_rto / 2
119            }
120            segment.resendts = current + segment.rto
121            lost++
122            lostSegs++
123        } else if segment.fastack >= resent { // fast retransmit
124            needsend = true
125            segment.fastack = 0
126            segment.rto = kcp.rx_rto
127            segment.resendts = current + segment.rto
128            change++
129            fastRetransSegs++
130        } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
131            needsend = true
132            segment.fastack = 0
133            segment.rto = kcp.rx_rto
134            segment.resendts = current + segment.rto
135            change++
136            earlyRetransSegs++
137        }
138        if needsend {
139            segment.xmit++
140            segment.ts = current
141            segment.wnd = seg.wnd
142            segment.una = seg.una
143            size := len(buffer) - len(ptr)
144            need := IKCP_OVERHEAD + len(segment.data)
145            if size+need > int(kcp.mtu) {
146                kcp.output(buffer, size)
147                current = currentMs() // time update for a blocking call
148                ptr = buffer
149            }
150            ptr = segment.encode(ptr)
151            copy(ptr, segment.data)
152            ptr = ptr[len(segment.data):]
153            if segment.xmit >= kcp.dead_link {
154                kcp.state = 0xFFFFFFFF
155            }
156        }
157    }
158    // flash remain segments
159    size := len(buffer) - len(ptr)
160    if size > 0 {
161        kcp.output(buffer, size)
162    }
163    // counter updates
164    sum := lostSegs
165    if lostSegs > 0 {
166        atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
167    }
168    if fastRetransSegs > 0 {
169        atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
170        sum += fastRetransSegs
171    }
172    if earlyRetransSegs > 0 {
173        atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
174        sum += earlyRetransSegs
175    }
176    if sum > 0 {
177        atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
178    }
179    // update ssthresh
180    // rate halving, https://tools.ietf.org/html/rfc6937
181    if change > 0 {
182        inflight := kcp.snd_nxt - kcp.snd_una
183        kcp.ssthresh = inflight / 2
184        if kcp.ssthresh < IKCP_THRESH_MIN {
185            kcp.ssthresh = IKCP_THRESH_MIN
186        }
187        kcp.cwnd = kcp.ssthresh + resent
188        kcp.incr = kcp.cwnd * kcp.mss
189    }
190    // congestion control, https://tools.ietf.org/html/rfc5681
191    if lost > 0 {
192        kcp.ssthresh = cwnd / 2
193        if kcp.ssthresh < IKCP_THRESH_MIN {
194            kcp.ssthresh = IKCP_THRESH_MIN
195        }
196        kcp.cwnd = 1
197        kcp.incr = kcp.mss
198    }
199    if kcp.cwnd < 1 {
200        kcp.cwnd = 1
201        kcp.incr = kcp.mss
202    }
203  }

flush函数非常的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly,意思就是只发送ack,如果ackOnly为true的话,该函数只遍历ack列表,然后发送,就完事了。 如果不是,也会发送真实数据。 在发送数据前先进行windSize探测,如果开启了拥塞控制nc=0,则每次发送前检测服务端的winsize,如果服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的winsize进行数据发送。

接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:

1. 如果这个段数据首次发送,则直接发送数据。

2. 如果这个段数据的当前时间大于它自身重发的时间,也就是RTO,则重传消息。

3. 如果这个段数据的ack丢失累计超过resent次数,则重传,也就是快速重传机制。这个resent参数由resend参数决定。

4. 如果这个段数据的ack有丢失且没有新的数据段,则触发ER,ER相关信息ER

最后通过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go的:

 1func (s *UDPSession) output(buf []byte) {
 2    var ecc [][]byte
 3    // extend buf's header space
 4    ext := buf
 5    if s.headerSize > 0 {
 6        ext = s.ext[:s.headerSize+len(buf)]
 7        copy(ext[s.headerSize:], buf)
 8    }
 9    // FEC stage
10    if s.fecEncoder != nil {
11        ecc = s.fecEncoder.Encode(ext)
12    }
13    // encryption stage
14    if s.block != nil {
15        io.ReadFull(rand.Reader, ext[:nonceSize])
16        checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
17        binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
18        s.block.Encrypt(ext, ext)
19        if ecc != nil {
20            for k := range ecc {
21                io.ReadFull(rand.Reader, ecc[k][:nonceSize])
22                checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
23                binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
24                s.block.Encrypt(ecc[k], ecc[k])
25            }
26        }
27    }
28    // WriteTo kernel
29    nbytes := 0
30    npkts := 0
31    // if mrand.Intn(100) < 50 {
32    for i := 0; i < s.dup+1; i++ {
33        if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
34            nbytes += n
35            npkts++
36        }
37    }
38    // }
39    if ecc != nil {
40        for k := range ecc {
41            if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
42                nbytes += n
43                npkts++
44            }
45        }
46    }
47    atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
48    atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
49}

output函数才是真正的将数据写入内核中,在写入之前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的hello就不是和原来的hello一样了,至少多了几个字节。 fec编码器有两个重要的参数reedsolomon.New(dataShards,parityShards,reedsolomon.WithMaxGoroutines(1)),dataShards和parityShards,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。

kcp的任务调度器

其实这里任务调度器是一个很简单的实现,用一个全局变量updater来管理session,代码文件为updater.go。其中最主要的函数

 1 func (h *updateHeap) updateTask() {
 2    var timer <-chan time.Time
 3    for {
 4        select {
 5        case <-timer:       
 6        case <-h.chWakeUp:
 7        }
 8        h.mu.Lock()
 9        hlen := h.Len()
10        now := time.Now()
11        if hlen > 0 && now.After(h.entries[0].ts) {
12            for i := 0; i < hlen; i++ {
13                entry := heap.Pop(h).(entry)
14                if now.After(entry.ts) {
15                    entry.ts = now.Add(entry.s.update())
16                    heap.Push(h, entry)
17                } else {
18                    heap.Push(h, entry)
19                    break
20                }
21            }
22        }
23        if hlen > 0 {
24            timer = time.After(h.entries[0].ts.Sub(now))
25        }
26        h.mu.Unlock()
27    }
28}

任务调度器实现了一个堆结构,每当有新的连接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。

总结

这里简单介绍了kcp的整体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是需要返回ack的,而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。

github:https://github.com/Golangltd/kcp-go

Google资深工程师深度讲解 Go语言基础到实战系列 (www.Golang.Ltd论坛提供下载链接)

版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。

原文发布于微信公众号 - Golang语言社区(Golangweb)

原文发表时间:2018-06-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JAVA烂猪皮

金三银四跳槽季,上周刚面试回来后的面试总结

上周陪同之前一起工作的同事去面试(乔治,小袁,鹏飞(面试人)),第一站是去深圳,第二站上海,第三站杭州。面试什么公司我在这里就不多说了,你们知道是一线公司就行。...

1192
来自专栏大数据钻研

金三银四跳槽季,上周刚面试回来后的面试总结,想进BAT必看

上周陪同之前一起工作的同事去面试(乔治,小袁,鹏飞(面试人)),第一站是去深圳,第二站上海,第三站杭州。面试什么公司我在这里就不多说了,你们知道是一线公司就行。...

3727
来自专栏Crossin的编程教室

Python 爬虫爬取美剧网站

一直有爱看美剧的习惯,一方面锻炼一下英语听力,一方面打发一下时间。之前是能在视频网站上面在线看的,可是自从广电总局的限制令之后,进口的美剧英剧等貌似就不在像以前...

3927
来自专栏Spark学习技巧

从零讲Java,给你一条清晰地学习道路!该学什么就学什么!

1195
来自专栏黑泽君的专栏

从零讲JAVA ,给你一条清晰地学习道路!该学什么就学什么!!

 原文链接:https://zhuanlan.zhihu.com/p/25296859

932
来自专栏游戏杂谈

emoji表情引发的JNI崩溃

今天突然接到客服那边的反馈说,有玩家反馈进游戏后不久就崩溃了,我先是怀疑网络问题,因为一连接聊天成功后就挂了。之后用logcat抓日志,发现挂在jni那里了

1523
来自专栏HaHack

Speed Up the Rendering Process of hexo 3

1093
来自专栏运维技术迷

openwrt将LAN口改为WAN方法

牢骚 折腾了好几个好几个小时,终于搞好了。原因就是因为固件里面的端口序号和实际路由器后面的序号不一致,导致我的设置和物理连接对不上,这是个巨坑。 折腾需求 PS...

6155
来自专栏FreeBuf

浅谈拒绝服务攻击的原理与防御(2) :反射型DDOS

0×01 前言 前几天提交了一篇关于DDOS攻击的文章到今天下午才审核通过发表出来,所以晚上闲来无事在接着写下面的内容,今天我就不多说废话了直接来干货。 目前来...

2896
来自专栏杨建荣的学习笔记

system表空间不足的问题分析(r6笔记第66天)

很多事情见多了也就有了麻木的感觉,报警短信就是如此,每天总能收到不少的报警短信,可能很多时候就扫一眼,如果没有严重的问题自己是不会情愿打开电脑处理的。 对于此,...

2684

扫码关注云+社区