[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架

简介:

本文描述了使用Go语言实现的、适应于Go语言并发模型的一种支持多路复用的网络协议处理框架,并提供了框架的代码实现。作者将这种框架用于网络游戏服务器中的协议处理,但也可用于其他领域。

应用背景:

在网络游戏服务器设计中,一般都会遇到协议多路复用的场景。比如登录服务器和玩家客户端之间有1:N的多个TCP连接;登录服务器和游戏服务器之间是1:1的TCP连接。玩家登录游戏的大致流程是这样的:

  1. 玩家连接登录服务器
  2. 登录服务器向数据库请求玩家数据
  3. 登录服务器获取到玩家数据,把玩家数据转发给游戏服务器进行加载包括创建玩家对象等
  4. 登录服务器获取到加载成功回应后,通知玩家客户端可以进入游戏世界

在3和4中,因为登录服务器和游戏服务器通常只有一个TCP连接,所有玩家数据都是通过这个连接进行传输,所以需要从协议包中区分出是哪个玩家的数据。通常这个区分的依据可以是玩家的角色名,但是也可以更通用一些,用一个数字ID来区分,这样就把协议包的分发处理和协议包中与游戏逻辑有关的内容分离开来。

协议说明:

通常网游的网络协议都是报文的形式,即使底层是使用TCP,也会用一些方法把数据拆分成一个个的报文(本文中称为协议包)。因此,本文也基于这一假设,但是对于具体的协议包格式,本文没有特别限制,只是要求协议包中能够容纳一个32字节的ID。

协议包的处理大概可以分为以下两种类型。其他更复杂的会话可以由以下两种类型组合而成。

  1. 发送一个数据包并等待回应。比如登录服务器等待游戏服务器加载玩家数据的结果通知。
  2. 发送一个数据包,不需要回应。比如游戏服务器加载玩家数据后,给登录服务器发送结果通知。

框架说明:

Go语言是一种支持高并发的编程语言,它支持高并发的方式是大量轻量级的goroutine并发执行。在每个goroutine中的操作基本上都是同步阻塞的,这样可以极大地简化程序逻辑,使得代码清晰易读,容易维护。基于这点,本文实现的框架的调用接口也是使用同步方式的。

  1. 如果一个协议包需要等待回应,就在调用函数上阻塞等待。这个调用的签名为: func (p *Connection) Query(data []byte) ([]byte, error) 注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
  2. 如果发送一个协议包是对于接收到的某个协议包的回应,则调用: func (p *Connection) Reply(query, answer []byte) error 注意:answer的控制权会转交给框架,因此函数调用后不能修改answer的内容。
  3. 如果一个协议包不需要回应,就直接调用发送函数: func (p *Connection) Write(data []byte) error 注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。
  4. 调用者需要实现的接口:
  • Socket。用于协议包的收发。基本上是net.TCPConn的简单封装,在头部加上一个协议包的长度。
  • DataHandler。用于协议处理,即没有通过Query返回的协议包会分发给此接口处理。
  • ErrorHandler。用于错误处理。当断线时,会调用此接口。
  • IdentityHandler。用于读取和设置会话ID。

5. 关于goroutine安全的说明:

ErrorHandler和DataHandler的函数实现中不能直接调用(*Connection).Close,否则会导致死锁。

导出类型、函数和接口:
type Connection

func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, ehErrorHandler) *Connection
func (p *Connection) Start()
func (p *Connection) Close()
func (p *Connection) Query(data []byte) (res []byte, err error)
func (p *Connection) Reply(query, answer []byte) error
func (p *Connection) Write(data []byte) error
type Socket interface {

Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {

Process([]byte)
}
type ErrorHandler interface {

OnError(error)
}
type IdentityHandler interface {

GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}

完整的代码实现:
package multiplexer

import (

"errors"
"sync"
"sync/atomic"
)
var (

ERR_EXIT = errors.New("exit")
)
type Socket interface {

Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {

Process([]byte)
}
type ErrorHandler interface {

OnError(error)
}
type IdentityHandler interface {

GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}
type Connection struct {


conn Socket

wg sync.WaitGroup

mutex sync.Mutex

applicants map[uint32]chan []byte

chexit chan bool

chsend chan []byte

chch chan chan []byte

dh DataHandler

ih IdentityHandler

eh ErrorHandler

identity uint32

}
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler)*Connection {


count := maxcount

if count < 1024 {



count = 1024


}

chch := make(chan chan []byte, count)

for i := 0; i < count; i++ {



chch <- make(chan []byte, 1)


}

return &Connection{



conn: conn,


applicants: make(map[uint32]chan []byte, count),


chsend: make(chan []byte, count),


chexit: make(chan bool),


chch: chch,


dh: dh,


ih: ih,


eh: eh,


}

}

func (p *Connection) Start() {


p.wg.Add(2)

go func() {



defer p.wg.Done()



p.recv()


}()

go func() {



defer p.wg.Done()



p.send()


}()

}

func (p *Connection) Close() {


close(p.chexit)

p.conn.Close()

p.wg.Wait()

}

func (p *Connection) Query(data []byte) (res []byte, err error) {


var ch chan []byte

select {

case <-p.chexit:



return nil, ERR_EXIT


case ch = <-p.chch:



defer func() {




p.chch <- ch



}()


}

id := p.newIdentity()

p.ih.SetIdentity(data, id)

p.addApplicant(id, ch)

defer func() {



if err != nil {




p.popApplicant(id)



}


}()

if err := p.Write(data); err != nil {



return nil, err


}

select {

case <-p.chexit:



return nil, ERR_EXIT


case res = <-ch:



break


}

return res, nil

}
func (p *Connection) Reply(query, answer []byte) error {


// put back the identity attached to the query

id := p.ih.GetIdentity(query)

p.ih.SetIdentity(answer, id)

return p.Write(answer)

}
func (p *Connection) Write(data []byte) error {


select {

case <-p.chexit:



return ERR_EXIT


case p.chsend <- data:



break


}

return nil

}
func (p *Connection) send() {


for {



select {


case <-p.chexit:



return


case data := <-p.chsend:



if p.conn.Write(data) != nil {





return




}



}


}

}
func (p *Connection) recv() (err error) {


defer func() {



if err != nil {




select {



case <-p.chexit:





err = nil




default:





p.eh.OnError(err)




}



}


}()

for {



select {


case <-p.chexit:




return nil



default:




break



}


data, err := p.conn.Read()


if err != nil {




return err



}


if id := p.ih.GetIdentity(data); id > 0 {




ch, ok := p.popApplicant(id)



if ok {





ch <- data





continue




}



}


p.dh.Process(data)


}

return nil

}
func (p *Connection) newIdentity() uint32 {


return atomic.AddUint32(&p.identity, 1)

}
func (p *Connection) addApplicant(identity uint32, ch chan []byte) {


p.mutex.Lock()

defer p.mutex.Unlock()

p.applicants[identity] = ch

}
func (p *Connection) popApplicant(identity uint32) (chan []byte, bool) {


p.mutex.Lock()

defer p.mutex.Unlock()

ch, ok := p.applicants[identity]

if !ok {



return nil, false


}

delete(p.applicants, identity)

return ch, true

}

本文分享自微信公众号 - Golang语言社区(Golangweb)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2016-08-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java架构沉思录

单线程的Redis为什么这么快?

https://blog.csdn.net/xlgen157387/article/details/79470556

29930
来自专栏Golang语言社区

[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架

简介: 本文描述了使用Go语言实现的、适应于Go语言并发模型的一种支持多路复用的网络协议处理框架,并提供了框架的代码实现。作者将这种框架用于网络游戏服务器中的协...

398100
来自专栏木东居士的专栏

从0写一个爬虫,爬取500w好友关系数据

74860
来自专栏安富莱嵌入式技术分享

【RL-TCPnet网络教程】第18章 BSD Sockets基础知识

本章节为大家讲解BSD Sockets,需要大家对BSD Sockets有个基础的认识,方便后面章节Socket实战操作。

13030
来自专栏linux驱动个人学习

CPUFreq驱动

CPUFreq子系统位于 drivers/cpufreq目录下,负责进行运行过程中CPU频率和电压的动态调整,即DvFS( Dynamic Voltage Fr...

17330
来自专栏F-Stack的专栏

F-Stack Q&A 第三期

Q1:如果在一个阻塞型的socket上执行recv,会不会把相应的线程卡死,调用recv时该socket中没有数据包,导致sleep,sleep导致该线程没办法...

79490
来自专栏Ceph对象存储方案

RGW Bucket Shard设计与优化-上

1 bucket index背景简介 bucket index是整个RGW里面一个非常关键的数据结构,用于存储bucket的索引数据,默认情况下单个bucke...

1.6K50
来自专栏杨建荣的学习笔记

通过shell脚本添加备库日志 (r9笔记第94天)

今天下午的时候,准备顺手写一个简单的脚本,但是发现很多事情较真起来真是寸步难行。在写脚本的过程中碰到了太多的问题,很多时候感觉像要实现的功能更通用,就得做更多的...

39460
来自专栏Java后端技术栈

为什么说Redis是单线程的?

近乎所有与Java相关的面试都会问到缓存的问题,基础一点的会问到什么是“二八定律”、什么是“热数据和冷数据” ,复杂一点的会问到缓存雪崩、缓存穿透、缓存预热、缓...

18720
来自专栏韩伟的专栏

集群开源软件赏:JGroups

目前我在腾讯主要负责一个服务器端软件的相关开源项目,所以接下来几天的开源内容是最近工作上积累的一些经验和想法,下图中的内容就是我目前主要的工作内容和一些小小的成...

41440

扫码关注云+社区

领取腾讯云代金券