前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang的http与twirp源码分析

golang的http与twirp源码分析

作者头像
歪歪梯
发布2020-06-19 16:16:53
6180
发布2020-06-19 16:16:53
举报
文章被收录于专栏:歪歪梯Club歪歪梯Club

http

启动httpHandle

使用http包的ListenAndServe方法,需要提供一个Handler对象

代码语言:javascript
复制
func ListenAndServe(addr string, handler Handler) error {
    server := &Server{Addr: addr, Handler: handler}
    return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
    if srv.shuttingDown() {
        return ErrServerClosed
    }
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(ln)
}

Handler对象中提供处理请求的方法,ServerHttp

代码语言:javascript
复制
type Handler interface {
    ServeHTTP(ResponseWriter, *Request)
}

Server循环accept请求

在ListenAndServe方法中,使用Handler构建一个Server对象,最终调用其Server方法

代码语言:javascript
复制
func (srv *Server) Serve(l net.Listener) error {
    .....
    //包装listener
    origListener := l
    l = &onceCloseListener{Listener: l}
    defer l.Close()
    //启动失败直接结束
    if err := srv.setupHTTP2_Serve(); err != nil {
        return err
    }

    if !srv.trackListener(&l, true) {
        return ErrServerClosed
    }
    defer srv.trackListener(&l, false)

    //绑定上下文等信息
    baseCtx := context.Background()
    if srv.BaseContext != nil {
        baseCtx = srv.BaseContext(origListener)
        if baseCtx == nil {
            panic("BaseContext returned a nil context")
        }
    }
    .....

    ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    for {
        //循环accept请求
        rw, err := l.Accept()
        if err != nil {
            select {
            //如果服务器已经关闭,直接结束
            case <-srv.getDoneChan():
                return ErrServerClosed
            default:
            }
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
            //如果accept失败,等待一个时间周期后continue
            .....
            return err
        }
        connCtx := ctx
        if cc := srv.ConnContext; cc != nil {
            connCtx = cc(connCtx, rw)
            if connCtx == nil {
                panic("ConnContext returned nil")
            }
        }
        tempDelay = 0
        //初始化一个连接对象
        c := srv.newConn(rw)
        //初始化连接状态
        c.setState(c.rwc, StateNew) // before Serve can return
        //开启一个新的goroutine去执行请求
        go c.serve(connCtx)
    }
}

处理connection

代码语言:javascript
复制
func (c *conn) serve(ctx context.Context) {
    //包装缓存区,上下文信息等
    .....
    for {
        //w是解析request数据得到的一个包装对象,包含请求的详细信息
        w, err := c.readRequest(ctx)
        if c.r.remain != c.server.initialReadLimitSize() {
            //如果这个连接已经被读取过(长连接),更新状态
            c.setState(c.rwc, StateActive)
        }
        if err != nil {
            //如果报文解析出错
            const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"

            switch {
            //根据错误类型,返回不同http status
            case .....
            default:
                publicErr := "400 Bad Request"
                if v, ok := err.(badRequestError); ok {
                    publicErr = publicErr + ": " + string(v)
                }
                fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
                return
            }
        }

        //进一步解析内容
        req := w.req
        if req.expectsContinue() {
            if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
                req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
            }
        } else if req.Header.get("Expect") != "" {
            w.sendExpectationFailed()
            return
        }
        c.curReq.Store(w)
        //如果body部分的内容还没读取完毕,等待本次io读取完毕后
        //修改管道的io读取为后台读取方式(开启另一个gorountine区完成数据读取)
        if requestBodyRemains(req.Body) {
            registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
        } else {
            w.conn.r.startBackgroundRead()
        }
        //调用ServeHTTP方法处理请求
        serverHandler{c.server}.ServeHTTP(w, w.req)
        w.cancelCtx()
        if c.hijacked() {
            return
        }
        //完成请求
        w.finishRequest()
        if !w.shouldReuseConnection() {
            //如果连接不允许重复使用,结束
            if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
                c.closeWriteAndWait()
            }
            return
        }
        c.setState(c.rwc, StateIdle)
        c.curReq.Store((*response)(nil))
        //如果不是长连接,就结束了
        if !w.conn.server.doKeepAlives() {
            return
        }

        if d := c.server.idleTimeout(); d != 0 {
            //如果设置了超时时间
            c.rwc.SetReadDeadline(time.Now().Add(d))
            if _, err := c.bufr.Peek(4); err != nil {
                //如果连接超时了,结束
                return
            }
        }
        c.rwc.SetReadDeadline(time.Time{})
    }
}

twirp

twirp是一个rpc框架,具体可以看看我的其他博客——go使用twirp开发rpc

TwirpServer

我们利用http启动twirp服务时传递的是TwirpServer

代码语言:javascript
复制
type TwirpServer interface {
    http.Handler
    ServiceDescriptor() ([]byte, int)
    ProtocGenTwirpVersion() string
    PathPrefix() string
}

proto文件

这是之前的proto文件,利用他我们编译生成了twirp的go代码

代码语言:javascript
复制
syntax = "proto3";
package main;

message A{}
message B {
    string result = 1;
}
service Test{
    rpc hello(A) returns(B);
}

Twirp的ServeHttp实现

可以查看twirp编译后生成代码提供的server实现方法ServeHttp

代码语言:javascript
复制
const TestPathPrefix = "/twirp/main.Test/"

func (s *testServer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
    ctx := req.Context()
    ctx = ctxsetters.WithPackageName(ctx, "main")
    ctx = ctxsetters.WithServiceName(ctx, "Test")
    ctx = ctxsetters.WithResponseWriter(ctx, resp)

    var err error
    ctx, err = callRequestReceived(ctx, s.hooks)
    if err != nil {
        s.writeError(ctx, resp, err)
        return
    }

    if req.Method != "POST" {
        msg := fmt.Sprintf("unsupported method %q (only POST is allowed)", req.Method)
        err = badRouteError(msg, req.Method, req.URL.Path)
        s.writeError(ctx, resp, err)
        return
    }

    switch req.URL.Path {
    case "/twirp/main.Test/Hello":
        s.serveHello(ctx, resp, req)
        return
    default:
        msg := fmt.Sprintf("no handler for path %q", req.URL.Path)
        err = badRouteError(msg, req.Method, req.URL.Path)
        s.writeError(ctx, resp, err)
        return
    }
}

可以看到只有post请求被允许,并且请求的url格式为

代码语言:javascript
复制
/twirp/包名.服务名/方法名

serve请求

最终再进去serveXxx的逻辑中去处理请求,根据是json格式数据还是protobuf格式数据,执行不同方法

代码语言:javascript
复制
func (s *testServer) serveHello(ctx context.Context, resp http.ResponseWriter, req *http.Request) {
    header := req.Header.Get("Content-Type")
    i := strings.Index(header, ";")
    if i == -1 {
        i = len(header)
    }
    switch strings.TrimSpace(strings.ToLower(header[:i])) {
    case "application/json":
        s.serveHelloJSON(ctx, resp, req)
    case "application/protobuf":
        s.serveHelloProtobuf(ctx, resp, req)
    default:
        msg := fmt.Sprintf("unexpected Content-Type: %q", req.Header.Get("Content-Type"))
        twerr := badRouteError(msg, req.Method, req.URL.Path)
        s.writeError(ctx, resp, twerr)
    }
}

比如进入protobuf

代码语言:javascript
复制
func (s *testServer) serveHelloProtobuf(ctx context.Context, resp http.ResponseWriter, req *http.Request) {
    var err error
    ctx = ctxsetters.WithMethodName(ctx, "Hello")
    //如果在hook里面编写了路由,此处执行
    ctx, err = callRequestRouted(ctx, s.hooks)
    if err != nil {
        s.writeError(ctx, resp, err)
        return
    }
    //读取请求内容
    buf, err := ioutil.ReadAll(req.Body)
    if err != nil {
        s.writeError(ctx, resp, wrapInternal(err, "failed to read request body"))
        return
    }
    reqContent := new(A)
    //Unmarshal请求内容
    if err = proto.Unmarshal(buf, reqContent); err != nil {
        s.writeError(ctx, resp, malformedRequestError("the protobuf request could not be decoded"))
        return
    }

    // Call service method
    //执行对应的服务方法
    var respContent *B
    func() {
        defer ensurePanicResponses(ctx, resp, s.hooks)
        //Test对象就是嵌套在server对象中的真正的服务对象
        respContent, err = s.Test.Hello(ctx, reqContent)
    }()

    if err != nil {
        s.writeError(ctx, resp, err)
        return
    }
    if respContent == nil {
        s.writeError(ctx, resp, twirp.InternalError("received a nil *B and nil error while calling Hello. nil responses are not supported"))
        return
    }
    //执行完服务,执行返回前类型的hooks
    ctx = callResponsePrepared(ctx, s.hooks)
    //将要返回的数据Marshal为字节
    respBytes, err := proto.Marshal(respContent)
    if err != nil {
        s.writeError(ctx, resp, wrapInternal(err, "failed to marshal proto response"))
        return
    }
    //状态码处理等
    ctx = ctxsetters.WithStatusCode(ctx, http.StatusOK)
    resp.Header().Set("Content-Type", "application/protobuf")
    resp.Header().Set("Content-Length", strconv.Itoa(len(respBytes)))
    resp.WriteHeader(http.StatusOK)
    //将数据写回请求客户端
    if n, err := resp.Write(respBytes); err != nil {
        msg := fmt.Sprintf("failed to write response, %d of %d bytes written: %s", n, len(respBytes), err.Error())
        twerr := twirp.NewError(twirp.Unknown, msg)
        callError(ctx, s.hooks, twerr)
    }
    //执行返回后类型的hooks
    callResponseSent(ctx, s.hooks)
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 歪歪梯Club 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • http
    • 启动httpHandle
      • Server循环accept请求
        • 处理connection
        • twirp
          • TwirpServer
            • proto文件
              • Twirp的ServeHttp实现
                • serve请求
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档