
Pitaya是一款由国外游戏公司topfreegames使用golang进行编写,易于使用,快速且轻量级的开源分布式游戏服务器框架 Pitaya使用etcd作为默认的服务发现组件,提供使用nats和grpc进行远程调用(server to server)的可选配置,并提供在docker中运行以上组件(etcd、nats)的docker-compose配置
    type PlayerConn interface {
        GetNextMessage() (b []byte, err error)
        net.Conn
    }
    type Acceptor interface {
        ListenAndServe()
        Stop()
        GetAddr() string
        GetConnChan() chan PlayerConn
    }
    type Wrapper interface {
        Wrap(acceptor.Acceptor) acceptor.Acceptor
    }
    type (
        // Agent corresponds to a user and is used for storing raw Conn information
        Agent struct {
            Session            *session.Session  // session
            appDieChan         chan bool         // app die channel
            chDie              chan struct{}     // wait for close
            chSend             chan pendingWrite // push message queue
            chStopHeartbeat    chan struct{}     // stop heartbeats
            chStopWrite        chan struct{}     // stop writing messages
            closeMutex         sync.Mutex
            conn               net.Conn            // low-level conn fd
            decoder            codec.PacketDecoder // binary decoder
            encoder            codec.PacketEncoder // binary encoder
            heartbeatTimeout   time.Duration
            lastAt             int64 // last heartbeat unix time stamp
            messageEncoder     message.Encoder
            ... ...
            state              int32                // current agent state
        }
        pendingWrite struct {
            ctx  context.Context
            data []byte
            err  error
        }
    )
    type Component interface {
        Init()
        AfterInit()
        BeforeShutdown()
        Shutdown()
    }
    type (
        //Handler represents a message.Message's handler's meta information.
        Handler struct {
            Receiver    reflect.Value  // receiver of method
            Method      reflect.Method // method stub
            Type        reflect.Type   // low-level type of method
            IsRawArg    bool           // whether the data need to serialize
            MessageType message.Type   // handler allowed message type (either request or notify)
        }
        //Remote represents remote's meta information.
        Remote struct {
            Receiver reflect.Value  // receiver of method
            Method   reflect.Method // method stub
            HasArgs  bool           // if remote has no args we won't try to serialize received data into arguments
            Type     reflect.Type   // low-level type of method
        }
        // Service implements a specific service, some of it's methods will be
        // called when the correspond events is occurred.
        Service struct {
            Name     string              // name of service
            Type     reflect.Type        // type of the receiver
            Receiver reflect.Value       // receiver of methods for the service
            Handlers map[string]*Handler // registered methods
            Remotes  map[string]*Remote  // registered remote methods
            Options  options             // options
        }
    )
    type Base struct{}
    func (c *Base) Init() error {
        return nil
    }
    func (c *Base) AfterInit() {}
    func (c *Base) BeforeShutdown() {}
    func (c *Base) Shutdown() error {
        return nil
    }
集中管理的对象容器在外部module.go中定义
    var (
        modulesMap = make(map[string]interfaces.Module)
        modulesArr = []moduleWrapper{}
    )
    type moduleWrapper struct {
        module interfaces.Module
        name   string
    }
    type (
        HandlerService struct {
            appDieChan         chan bool             // die channel app
            chLocalProcess     chan unhandledMessage // channel of messages that will be processed locally
            chRemoteProcess    chan unhandledMessage // channel of messages that will be processed remotely
            decoder            codec.PacketDecoder   // binary decoder
            encoder            codec.PacketEncoder   // binary encoder
            heartbeatTimeout   time.Duration
            messagesBufferSize int
            remoteService      *RemoteService
            serializer         serialize.Serializer          // message serializer
            server             *cluster.Server               // server obj
            services           map[string]*component.Service // all registered service
            messageEncoder     message.Encoder
            metricsReporters   []metrics.Reporter
        }
        unhandledMessage struct {
            ctx   context.Context
            agent *agent.Agent
            route *route.Route
            msg   *message.Message
        }
    )
    type RemoteService struct {
        rpcServer              cluster.RPCServer
        serviceDiscovery       cluster.ServiceDiscovery
        serializer             serialize.Serializer
        encoder                codec.PacketEncoder
        rpcClient              cluster.RPCClient
        services               map[string]*component.Service // all registered service
        router                 *router.Router
        messageEncoder         message.Encoder
        server                 *cluster.Server // server obj
        remoteBindingListeners []cluster.RemoteBindingListener
    }
    var (
        Manager = &struct {
            incrementID    int64      
            timers         sync.Map   
            ChClosingTimer chan int64 
            ChCreatedTimer chan *Timer
        }{}
        Precision = time.Second
        GlobalTicker *time.Ticker
    )
    var (
        BeforeHandler = &pipelineChannel{}
        AfterHandler = &pipelineAfterChannel{}
    )
    type (
        HandlerTempl func(ctx context.Context, in interface{}) (out interface{}, err error)
        AfterHandlerTempl func(ctx context.Context, out interface{}, err error) (interface{}, error)
        pipelineChannel struct {
            Handlers []HandlerTempl
        }
        pipelineAfterChannel struct {
            Handlers []AfterHandlerTempl
        }
    )
app.go是系统启动的入口 创建HandlerService 并根据启动模式如果是集群模式创建RemoteService 开启服务端事件监听 开启监听服务器关闭信号的Chan
    var (
        app = &App{
            ... ..
        }
        remoteService  *service.RemoteService
        handlerService *service.HandlerService
    )
    func Start() {
        ... ..
        if app.serverMode == Cluster {
            ... ..
            app.router.SetServiceDiscovery(app.serviceDiscovery)
            remoteService = service.NewRemoteService(
                app.rpcClient,
                app.rpcServer,
                app.serviceDiscovery,
                app.router,
                ... ..
            )
            app.rpcServer.SetPitayaServer(remoteService)
            initSysRemotes()
        }
        handlerService = service.NewHandlerService(
            app.dieChan,
            app.heartbeat,
            app.server,
            remoteService,
            ... ..
        )
        ... ..
        listen()
        ... ..
        // stop server
        select {
        case <-app.dieChan:
            logger.Log.Warn("the app will shutdown in a few seconds")
        case s := <-sg:
            logger.Log.Warn("got signal: ", s, ", shutting down...")
            close(app.dieChan)
        }
        ... ..
    }
listen方法也就是开启服务,具体包括以下步骤: 1.注册Component 2.注册定时任务的GlobalTicker 3.开启Dispatch处理业务和定时任务(ticket)的goroutine 4.开启acceptor处理连接的goroutine 5.开启主逻辑的goroutine 6.注册Modules
    func listen() {
        startupComponents()
        timer.GlobalTicker = time.NewTicker(timer.Precision)
        logger.Log.Infof("starting server %s:%s", app.server.Type, app.server.ID)
        for i := 0; i < app.config.GetInt("pitaya.concurrency.handler.dispatch"); i++ {
            go handlerService.Dispatch(i)
        }
        for _, acc := range app.acceptors {
            a := acc
            go func() {
                for conn := range a.GetConnChan() {
                    go handlerService.Handle(conn)
                }
            }()
            go func() {
                a.ListenAndServe()
            }()
            logger.Log.Infof("listening with acceptor %s on addr %s", reflect.TypeOf(a), a.GetAddr())
        }
        ... ..
        startModules()
        logger.Log.Info("all modules started!")
        app.running = true
    }
startupComponents对Component进行初始化 然后把Component注册到handlerService和remoteService上
    func startupComponents() {
        // component initialize hooks
        for _, c := range handlerComp {
            c.comp.Init()
        }
        // component after initialize hooks
        for _, c := range handlerComp {
            c.comp.AfterInit()
        }
        // register all components
        for _, c := range handlerComp {
            if err := handlerService.Register(c.comp, c.opts); err != nil {
                logger.Log.Errorf("Failed to register handler: %s", err.Error())
            }
        }
        // register all remote components
        for _, c := range remoteComp {
            if remoteService == nil {
                logger.Log.Warn("registered a remote component but remoteService is not running! skipping...")
            } else {
                if err := remoteService.Register(c.comp, c.opts); err != nil {
                    logger.Log.Errorf("Failed to register remote: %s", err.Error())
                }
            }
        }
        ... ..
    }
比如HandlerService的注册,反射得到component类型的全部方法,判断isHandlerMethod就加入services里面 并聚合Component对象的反射Value对象为全部Handler的Method Receiver,减少了对象引用
    func NewService(comp Component, opts []Option) *Service {
        s := &Service{
            Type:     reflect.TypeOf(comp),
            Receiver: reflect.ValueOf(comp),
        }
        ... ..
        return s
    }
    func (h *HandlerService) Register(comp component.Component, opts []component.Option) error {
        s := component.NewService(comp, opts)
        ... ..
        if err := s.ExtractHandler(); err != nil {
            return err
        }
        h.services[s.Name] = s
        for name, handler := range s.Handlers {
            handlers[fmt.Sprintf("%s.%s", s.Name, name)] = handler
        }
        return nil
    }
    func (s *Service) ExtractHandler() error {
        typeName := reflect.Indirect(s.Receiver).Type().Name()
        ... ..
        s.Handlers = suitableHandlerMethods(s.Type, s.Options.nameFunc)
        ... ..
        for i := range s.Handlers {
            s.Handlers[i].Receiver = s.Receiver
        }
        return nil
    }
    func suitableHandlerMethods(typ reflect.Type, nameFunc func(string) string) map[string]*Handler {
        methods := make(map[string]*Handler)
        for m := 0; m < typ.NumMethod(); m++ {
            method := typ.Method(m)
            mt := method.Type
            mn := method.Name
            if isHandlerMethod(method) {
                ... ..
                handler := &Handler{
                    Method:      method,
                    IsRawArg:    raw,
                    MessageType: msgType,
                }
                ... ..
                methods[mn] = handler
            }
        }
        return methods
    }
handlerService.Dispatch方法负责各种业务的处理,包括: 1.处理chLocalProcess中的本地Message 2.使用remoteService处理chRemoteProcess中的远程Message 3.在定时ticket到达时调用timer.Cron执行定时任务 4.管理定时任务的创建 5.管理定时任务的删除
    func (h *HandlerService) Dispatch(thread int) {
        defer timer.GlobalTicker.Stop()
        for {
            select {
            case lm := <-h.chLocalProcess:
                metrics.ReportMessageProcessDelayFromCtx(lm.ctx, h.metricsReporters, "local")
                h.localProcess(lm.ctx, lm.agent, lm.route, lm.msg)
            case rm := <-h.chRemoteProcess:
                metrics.ReportMessageProcessDelayFromCtx(rm.ctx, h.metricsReporters, "remote")
                h.remoteService.remoteProcess(rm.ctx, nil, rm.agent, rm.route, rm.msg)
            case <-timer.GlobalTicker.C: // execute cron task
                timer.Cron()
            case t := <-timer.Manager.ChCreatedTimer: // new Timers
                timer.AddTimer(t)
            case id := <-timer.Manager.ChClosingTimer: // closing Timers
                timer.RemoveTimer(id)
            }
        }
    }
接下来看看Acceptor的工作,以下为Tcp实现,就是负责接收连接,流入acceptor的Chan
    func (a *TCPAcceptor) ListenAndServe() {
        if a.hasTLSCertificates() {
            a.ListenAndServeTLS(a.certFile, a.keyFile)
            return
        }
        listener, err := net.Listen("tcp", a.addr)
        if err != nil {
            logger.Log.Fatalf("Failed to listen: %s", err.Error())
        }
        a.listener = listener
        a.running = true
        a.serve()
    }
    func (a *TCPAcceptor) serve() {
        defer a.Stop()
        for a.running {
            conn, err := a.listener.Accept()
            if err != nil {
                logger.Log.Errorf("Failed to accept TCP connection: %s", err.Error())
                continue
            }
            a.connChan <- &tcpPlayerConn{
                Conn: conn,
            }
        }
    }
前面讲过对于每个Acceptor开启了一个goroutine去处理连接,也就是下面代码
    for conn := range a.GetConnChan() {
        go handlerService.Handle(conn)
    }
所以流入Chan的连接就会被实时的开启一个goroutine去处理,处理过程就是先创建一个Agent对象 并开启一个goroutine给Agent负责维护连接的心跳 然后开启死循环,读取连接的数据processPacket
    func (h *HandlerService) Handle(conn acceptor.PlayerConn) {
        // create a client agent and startup write goroutine
        a := agent.NewAgent(conn, h.decoder, h.encoder, h.serializer, h.heartbeatTimeout, h.messagesBufferSize, h.appDieChan, h.messageEncoder, h.metricsReporters)
        // startup agent goroutine
        go a.Handle()
        ... ..
        for {
            msg, err := conn.GetNextMessage()
            if err != nil {
                logger.Log.Errorf("Error reading next available message: %s", err.Error())
                return
            }
            packets, err := h.decoder.Decode(msg)
            if err != nil {
                logger.Log.Errorf("Failed to decode message: %s", err.Error())
                return
            }
            if len(packets) < 1 {
                logger.Log.Warnf("Read no packets, data: %v", msg)
                continue
            }
            // process all packet
            for i := range packets {
                if err := h.processPacket(a, packets[i]); err != nil {
                    logger.Log.Errorf("Failed to process packet: %s", err.Error())
                    return
                }
            }
        }
    }
这时如果使用了pitaya提供的漏桶算法实现的限流wrap来包装acceptor,则会对客户端发送的消息进行限流限速 这里也是灵活利用for循环遍历chan的特性,所以也是实时地对连接进行包装
    func (b *BaseWrapper) ListenAndServe() {
        go b.pipe()
        b.Acceptor.ListenAndServe()
    }
    // GetConnChan returns the wrapper conn chan
    func (b *BaseWrapper) GetConnChan() chan acceptor.PlayerConn {
        return b.connChan
    }
    func (b *BaseWrapper) pipe() {
        for conn := range b.Acceptor.GetConnChan() {
            b.connChan <- b.wrapConn(conn)
        }
    }
    type RateLimitingWrapper struct {
        BaseWrapper
    }
    func NewRateLimitingWrapper(c *config.Config) *RateLimitingWrapper {
        r := &RateLimitingWrapper{}
        r.BaseWrapper = NewBaseWrapper(func(conn acceptor.PlayerConn) acceptor.PlayerConn {
            ... ..
            return NewRateLimiter(conn, limit, interval, forceDisable)
        })
        return r
    }
    func (r *RateLimitingWrapper) Wrap(a acceptor.Acceptor) acceptor.Acceptor {
        r.Acceptor = a
        return r
    }
    func (r *RateLimiter) GetNextMessage() (msg []byte, err error) {
        if r.forceDisable {
            return r.PlayerConn.GetNextMessage()
        }
        for {
            msg, err := r.PlayerConn.GetNextMessage()
            if err != nil {
                return nil, err
            }
            now := time.Now()
            if r.shouldRateLimit(now) {
                logger.Log.Errorf("Data=%s, Error=%s", msg, constants.ErrRateLimitExceeded)
                metrics.ReportExceededRateLimiting(pitaya.GetMetricsReporters())
                continue
            }
            return msg, err
        }
    }
processPacket对数据包解包后,执行processMessage
    func (h *HandlerService) processPacket(a *agent.Agent, p *packet.Packet) error {
        switch p.Type {
        case packet.Handshake:
            ... ..
        case packet.HandshakeAck:
            ... ..
        case packet.Data:
            if a.GetStatus() < constants.StatusWorking {
                return fmt.Errorf("receive data on socket which is not yet ACK, session will be closed immediately, remote=%s",
                    a.RemoteAddr().String())
            }
            msg, err := message.Decode(p.Data)
            if err != nil {
                return err
            }
            h.processMessage(a, msg)
        case packet.Heartbeat:
            // expected
        }
        a.SetLastAt()
        return nil
    }
processMessage中包装数据包为unHandledMessage 根据消息类型,流入chLocalProcess 或者chRemoteProcess 也就转交给上面提到的负责Dispatch的goroutine去处理了
    func (h *HandlerService) processMessage(a *agent.Agent, msg *message.Message) {
        requestID := uuid.New()
        ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, time.Now().UnixNano())
        ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, msg.Route)
        ctx = pcontext.AddToPropagateCtx(ctx, constants.RequestIDKey, requestID.String())
        tags := opentracing.Tags{
            "local.id":   h.server.ID,
            "span.kind":  "server",
            "msg.type":   strings.ToLower(msg.Type.String()),
            "user.id":    a.Session.UID(),
            "request.id": requestID.String(),
        }
        ctx = tracing.StartSpan(ctx, msg.Route, tags)
        ctx = context.WithValue(ctx, constants.SessionCtxKey, a.Session)
        r, err := route.Decode(msg.Route)
        ... ..
        message := unhandledMessage{
            ctx:   ctx,
            agent: a,
            route: r,
            msg:   msg,
        }
        if r.SvType == h.server.Type {
            h.chLocalProcess <- message
        } else {
            if h.remoteService != nil {
                h.chRemoteProcess <- message
            } else {
                logger.Log.Warnf("request made to another server type but no remoteService running")
            }
        }
    }
服务器进程启动的最后一步是对全局模块启动
在外部的module.go文件中,提供了对module的全局注册方法、全部顺序启动方法、全部顺序关闭方法
    func RegisterModule(module interfaces.Module, name string) error {
        ... ..
    }
    func startModules() {
        for _, modWrapper := range modulesArr {
            modWrapper.module.Init()
        }
        for _, modWrapper := range modulesArr {
            modWrapper.module.AfterInit()
        }
    }
    func shutdownModules() {
        for i := len(modulesArr) - 1; i >= 0; i-- {
            modulesArr[i].module.BeforeShutdown()
        }
        for i := len(modulesArr) - 1; i >= 0; i-- {
            mod := modulesArr[i].module
            mod.Shutdown()
        }
    }
    func (h *HandlerService) localProcess(ctx context.Context, a *agent.Agent, route *route.Route, msg *message.Message) {
        var mid uint
        switch msg.Type {
        case message.Request:
            mid = msg.ID
        case message.Notify:
            mid = 0
        }
        ret, err := processHandlerMessage(ctx, route, h.serializer, a.Session, msg.Data, msg.Type, false)
        if msg.Type != message.Notify {
            ... ..
            err := a.Session.ResponseMID(ctx, mid, ret)
            ... ..
        } else {
            metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, nil)
            tracing.FinishSpan(ctx, err)
        }
    }
    func processHandlerMessage(
        ctx context.Context,
        rt *route.Route,
        serializer serialize.Serializer,
        session *session.Session,
        data []byte,
        msgTypeIface interface{},
        remote bool,
    ) ([]byte, error) {
        if ctx == nil {
            ctx = context.Background()
        }
        ctx = context.WithValue(ctx, constants.SessionCtxKey, session)
        ctx = util.CtxWithDefaultLogger(ctx, rt.String(), session.UID())
        h, err := getHandler(rt)
        ... ..
        msgType, err := getMsgType(msgTypeIface)
        ... ..
        logger := ctx.Value(constants.LoggerCtxKey).(logger.Logger)
        exit, err := h.ValidateMessageType(msgType)
        ... ..
        arg, err := unmarshalHandlerArg(h, serializer, data)
        ... ..
        if arg, err = executeBeforePipeline(ctx, arg); err != nil {
            return nil, err
        }
        ... ..
        args := []reflect.Value{h.Receiver, reflect.ValueOf(ctx)}
        if arg != nil {
            args = append(args, reflect.ValueOf(arg))
        }
        resp, err := util.Pcall(h.Method, args)
        if remote && msgType == message.Notify {
            resp = []byte("ack")
        }
        resp, err = executeAfterPipeline(ctx, resp, err)
        ... ..
        ret, err := serializeReturn(serializer, resp)
        ... ..
        return ret, nil
    }
    func executeBeforePipeline(ctx context.Context, data interface{}) (interface{}, error) {
        var err error
        res := data
        if len(pipeline.BeforeHandler.Handlers) > 0 {
            for _, h := range pipeline.BeforeHandler.Handlers {
                res, err = h(ctx, res)
                if err != nil {
                    logger.Log.Debugf("pitaya/handler: broken pipeline: %s", err.Error())
                    return res, err
                }
            }
        }
        return res, nil
    }
    func executeAfterPipeline(ctx context.Context, res interface{}, err error) (interface{}, error) {
        ret := res
        if len(pipeline.AfterHandler.Handlers) > 0 {
            for _, h := range pipeline.AfterHandler.Handlers {
                ret, err = h(ctx, ret, err)
            }
        }
        return ret, err
    }
util.pcall里展示了golang反射的一种高级用法 method.Func.Call,第一个参数是Receiver,也就是调用对象方法的实例 这种设计对比直接保存Value对象的method,反射时直接call,拥有的额外好处就是降低了对象引用,方法不和实例绑定
    func Pcall(method reflect.Method, args []reflect.Value) (rets interface{}, err error) {
        ... ..
        r := method.Func.Call(args)
        if len(r) == 2 {
            if v := r[1].Interface(); v != nil {
                err = v.(error)
            } else if !r[0].IsNil() {
                rets = r[0].Interface()
            } else {
                err = constants.ErrReplyShouldBeNotNull
            }
        }
        return
    }