前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架

[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架

作者头像
李海彬
发布2018-03-22 14:46:45
8430
发布2018-03-22 14:46:45
举报
文章被收录于专栏:Golang语言社区Golang语言社区

简介:

本文描述了使用Go语言实现的、适应于Go语言并发模型的一种支持多路复用的网络协议处理框架,并提供了框架的代码实现。作者将这种框架用于网络游戏服务器中的协议处理,但也可用于其他领域。

应用背景:

在网络游戏服务器设计中,一般都会遇到协议多路复用的场景。比如登录服务器和玩家客户端之间有1:N的多个TCP连接;登录服务器和游戏服务器之间是1:1的TCP连接。玩家登录游戏的大致流程是这样的:

  1. 玩家连接登录服务器
  2. 登录服务器向数据库请求玩家数据
  3. 登录服务器获取到玩家数据,把玩家数据转发给游戏服务器进行加载包括创建玩家对象等
  4. 登录服务器获取到加载成功回应后,通知玩家客户端可以进入游戏世界

在3和4中,因为登录服务器和游戏服务器通常只有一个TCP连接,所有玩家数据都是通过这个连接进行传输,所以需要从协议包中区分出是哪个玩家的数据。通常这个区分的依据可以是玩家的角色名,但是也可以更通用一些,用一个数字ID来区分,这样就把协议包的分发处理和协议包中与游戏逻辑有关的内容分离开来。

协议说明:

通常网游的网络协议都是报文的形式,即使底层是使用TCP,也会用一些方法把数据拆分成一个个的报文(本文中称为协议包)。因此,本文也基于这一假设,但是对于具体的协议包格式,本文没有特别限制,只是要求协议包中能够容纳一个32字节的ID。

协议包的处理大概可以分为以下两种类型。其他更复杂的会话可以由以下两种类型组合而成。

  1. 发送一个数据包并等待回应。比如登录服务器等待游戏服务器加载玩家数据的结果通知。
  2. 发送一个数据包,不需要回应。比如游戏服务器加载玩家数据后,给登录服务器发送结果通知。

框架说明:

Go语言是一种支持高并发的编程语言,它支持高并发的方式是大量轻量级的goroutine并发执行。在每个goroutine中的操作基本上都是同步阻塞的,这样可以极大地简化程序逻辑,使得代码清晰易读,容易维护。基于这点,本文实现的框架的调用接口也是使用同步方式的。

  1. 如果一个协议包需要等待回应,就在调用函数上阻塞等待。这个调用的签名为: func (p *Connection) Query(data []byte) ([]byte, error) 注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
  2. 如果发送一个协议包是对于接收到的某个协议包的回应,则调用: func (p *Connection) Reply(query, answer []byte) error 注意:answer的控制权会转交给框架,因此函数调用后不能修改answer的内容。
  3. 如果一个协议包不需要回应,就直接调用发送函数: func (p *Connection) Write(data []byte) error 注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
  4. 调用者需要实现的接口:
  • Socket。用于协议包的收发。基本上是net.TCPConn的简单封装,在头部加上一个协议包的长度。
  • DataHandler。用于协议处理,即没有通过Query返回的协议包会分发给此接口处理。
  • ErrorHandler。用于错误处理。当断线时,会调用此接口。
  • IdentityHandler。用于读取和设置会话ID。

5. 关于goroutine安全的说明:

ErrorHandler和DataHandler的函数实现中不能直接调用(*Connection).Close,否则会导致死锁。

代码语言:javascript
复制
导出类型、函数和接口:
代码语言:javascript
复制
type Connection

func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, ehErrorHandler) *Connection
func (p *Connection) Start()
func (p *Connection) Close()
func (p *Connection) Query(data []byte) (res []byte, err error)
func (p *Connection) Reply(query, answer []byte) error
func (p *Connection) Write(data []byte) error
type Socket interface {

Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {

Process([]byte)
}
type ErrorHandler interface {

OnError(error)
}
type IdentityHandler interface {

GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}

完整的代码实现:
package multiplexer

import (

"errors"
"sync"
"sync/atomic"
)
var (

ERR_EXIT = errors.New("exit")
)
type Socket interface {

Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {

Process([]byte)
}
type ErrorHandler interface {

OnError(error)
}
type IdentityHandler interface {

GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}
type Connection struct {


conn Socket

wg sync.WaitGroup

mutex sync.Mutex

applicants map[uint32]chan []byte

chexit chan bool

chsend chan []byte

chch chan chan []byte

dh DataHandler

ih IdentityHandler

eh ErrorHandler

identity uint32

}
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler)*Connection {


count := maxcount

if count < 1024 {



count = 1024


}

chch := make(chan chan []byte, count)

for i := 0; i < count; i++ {



chch <- make(chan []byte, 1)


}

return &Connection{



conn: conn,


applicants: make(map[uint32]chan []byte, count),


chsend: make(chan []byte, count),


chexit: make(chan bool),


chch: chch,


dh: dh,


ih: ih,


eh: eh,


}

}

func (p *Connection) Start() {


p.wg.Add(2)

go func() {



defer p.wg.Done()



p.recv()


}()

go func() {



defer p.wg.Done()



p.send()


}()

}

func (p *Connection) Close() {


close(p.chexit)

p.conn.Close()

p.wg.Wait()

}

func (p *Connection) Query(data []byte) (res []byte, err error) {


var ch chan []byte

select {

case <-p.chexit:



return nil, ERR_EXIT


case ch = <-p.chch:



defer func() {




p.chch <- ch



}()


}

id := p.newIdentity()

p.ih.SetIdentity(data, id)

p.addApplicant(id, ch)

defer func() {



if err != nil {




p.popApplicant(id)



}


}()

if err := p.Write(data); err != nil {



return nil, err


}

select {

case <-p.chexit:



return nil, ERR_EXIT


case res = <-ch:



break


}

return res, nil

}
func (p *Connection) Reply(query, answer []byte) error {


// put back the identity attached to the query

id := p.ih.GetIdentity(query)

p.ih.SetIdentity(answer, id)

return p.Write(answer)

}
func (p *Connection) Write(data []byte) error {


select {

case <-p.chexit:



return ERR_EXIT


case p.chsend <- data:



break


}

return nil

}
func (p *Connection) send() {


for {



select {


case <-p.chexit:



return


case data := <-p.chsend:



if p.conn.Write(data) != nil {





return




}



}


}

}
func (p *Connection) recv() (err error) {


defer func() {



if err != nil {




select {



case <-p.chexit:





err = nil




default:





p.eh.OnError(err)




}



}


}()

for {



select {


case <-p.chexit:




return nil



default:




break



}


data, err := p.conn.Read()


if err != nil {




return err



}


if id := p.ih.GetIdentity(data); id > 0 {




ch, ok := p.popApplicant(id)



if ok {





ch <- data





continue




}



}


p.dh.Process(data)


}

return nil

}
func (p *Connection) newIdentity() uint32 {


return atomic.AddUint32(&p.identity, 1)

}
func (p *Connection) addApplicant(identity uint32, ch chan []byte) {


p.mutex.Lock()

defer p.mutex.Unlock()

p.applicants[identity] = ch

}
func (p *Connection) popApplicant(identity uint32) (chan []byte, bool) {


p.mutex.Lock()

defer p.mutex.Unlock()

ch, ok := p.applicants[identity]

if !ok {



return nil, false


}

delete(p.applicants, identity)

return ch, true

}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-08-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Golang语言社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档