前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >tRPC-Go 链路透传消息的源码级解读

tRPC-Go 链路透传消息的源码级解读

原创
作者头像
Martin Hong
发布2024-05-14 21:13:31
760
发布2024-05-14 21:13:31

概述

在分布式链路追踪等场景下,会使用到微服务调用链路上的透传能力,tRPC-Go 基于 tRPC 协议的头部设计实现了对链路透传的支持,这篇文章从源码角度分析链路透传的设计实现,文章中会涉及 tRPC-go 里不同场景中如何正确使用链路透传功能。

说明

  1. 本文基于以下源码以及版本 trpc-go: v0.9.4 (截止本文编写时的最新发布版本)
  2. 所有 tRPC-Go 源码均在文章中提供了链接,可以点击链接直达工蜂仓库。

源码走读

先从应用层调用讲起

一般而言,一个典型的 tRPC-go 的 RPC 调用起点类似如下代码:

代码语言:go
复制
proxy := pb.NewOrderClientProxy()
rsp, err := proxy.PlaceOrder(ctx, req)

这里的工厂函数和方法都是我们用 trpc create 命令生成的桩代码里提供的实现,下一小节我们会看到桩代码里的实现。

另一方面,按照官方文档的说法,从主调向被调传递透传信息的话,需要使用 client.WithMetaData 传递选项,假如我们需要向 PlaceOrder 传递一个 requestID,值为 123456

代码语言:go
复制
options := []client.Option{
    client.WithMetaData("requestID", []byte("123456")),
}
rsp, err := proxy.PlaceOrder(ctx, req, options...)

而假设被调的 PlaceOrder 方法会反过来透传一个流控信息,ExceedRateLimit"true",则主调还需要显式设定一个协议头对象指针:

代码语言:go
复制
head := &trpc.ResponseProtocol{}
options := []client.Option{
    client.WithMetaData("requestID", []byte("123456")),
	client.WithRspHead(head),
}
rsp, err := proxy.PlaceOrder(ctx, req, options...)
if err == nil {
    fmt.Println(head.TransInfo) // 一个包含 key 为 ExceedRateLimit 的字典,类型为 map[string][]byte
}

记住这段代码,这是我们后面揭秘的基础,我们会研究这一段代码背后工作的原理。

从 tRPC-go 桩代码着手

我们重点看看方法桩里边的代码:

代码语言:go
复制
func (c *OrderClientProxyImpl) PlaceOrder(ctx context.Context, req *EmptyMessage, opts ...client.Option) (rsp *EmptyMessage, err error) {
   // 这里通过 ctx 派生出新的 ctx 和消息,后面会有源码解读
   ctx, msg := codec.WithCloneMessage(ctx)
   // 这里会对派生出的消息进行被调信息设置,上面的 `WithCloneMessage` 会负责设置消息里的主调信息
   msg.WithClientRPCName("/trpc.example.orders_svr.Order/PlaceOrder")
   msg.WithCalleeServiceName(OrderServer_ServiceDesc.ServiceName)
   msg.WithCalleeApp("example")
   msg.WithCalleeServer("orders_svr")
   msg.WithCalleeService("Order")
   msg.WithCalleeMethod("PlaceOrder")
   msg.WithSerializationType(codec.SerializationTypePB)

   rsp = &EmptyMessage{}

   err = c.client.Invoke(ctx, req, rsp, callopts...)
   // 一些收尾工作,不相关,跳过
}

可以看到,每个 RPC 方法桩的实现里,其实都是标准的过程:

  1. 通过 codec.WithCloneMessage(ctx) 派生一组新的 context 和 message 实例;
  2. 设置 message 的被调信息,被调信息用于指明服务发现的目标以及 RPC 被调的服务名和方法名;
  3. 合并 opts 选项信息;
  4. 通过 c.client.Invoke 方法发起实际的远程调用。

我们只想关注链路透传过程,所以我们后面重点关注 codec.WithCloneMessage 函数和 c.clientInvoke 方法的逻辑。

codec.WithCloneMessage 的作用

我们进一步检查 codec.WithCloneMessage 的源码:

代码语言:go
复制
func WithCloneMessage(ctx context.Context) (context.Context, Msg) {
    newMsg := msgPool.Get().(*msg)    // 从 sync.Pool 实例 msgPool 获取一个池化指针,这种写法是为了减少对象创建和销毁的开销
    val := ctx.Value(ContextKeyMessage)  // 从 ctx 对象中获取其绑定的 *codec.Message 指针,一般是当前服务收到的一个请求的消息的指针
    m, ok := val.(*msg)
    if !ok { // 下面 3 行,是在传入的 ctx 不是一个合法的 tRPC context 的情况下,直接使用全新的 *codec.Message
        ctx = context.WithValue(ctx, ContextKeyMessage, newMsg)
        newMsg.context = ctx
        return ctx, newMsg
    }
	// 以下两行,将新的 ctx 和新的消息指针 newMsg 相互绑定,后续业务逻辑里,使用 ctx 便可以找到绑定的消息指针
    ctx = context.WithValue(ctx, ContextKeyMessage, newMsg)
    newMsg.context = ctx
	// 以下两行,从原有的消息指针指向的对象上拷贝部分值到新的消息指针指向的内存上
    copyCommonMessage(m, newMsg)
    copyServerToClientMessage(m, newMsg)
    return ctx, newMsg
}

如代码注释说明,WithCloneMessage 实现一组新的 ctxmsg 的派生,消息派生过程中,具体拷贝了哪些信息呢?我们接着看 copyCommonMessagecopyServerToClientMessage

先看 copyCommonMessage

代码语言:go
复制
func copyCommonMessage(m *msg, newMsg *msg) {
    newMsg.frameHead = m.frameHead
    newMsg.requestTimeout = m.requestTimeout
    newMsg.serializationType = m.serializationType
    newMsg.serverRPCName = m.serverRPCName
    newMsg.clientRPCName = m.clientRPCName
    newMsg.serverReqHead = m.serverReqHead
    newMsg.serverRspHead = m.serverRspHead
    newMsg.dyeing = m.dyeing
    newMsg.dyeingKey = m.dyeingKey
    newMsg.serverMetaData = m.serverMetaData
    newMsg.logger = m.logger
    newMsg.namespace = m.namespace
    newMsg.envName = m.envName
    newMsg.setName = m.setName
    newMsg.envTransfer = m.envTransfer
    newMsg.commonMeta = m.commonMeta.Clone()
}

可以看到,copyCommonMessage 主要是拷贝了这些信息:

  • 请求超时时间
  • RPC 请求和响应各自的名称和头部
  • serverMetaData:这个是当前服务收到的请求中由主调方传递到被调服务方的链路透传信息
  • 其他一些框架定义的上下文信息,比如 namespaceenvName 等。

接着再看 copyServerToClientMessage

代码语言:go
复制
func copyServerToClientMessage(m *msg, newMsg *msg) {
	// 将原消息的服务端收到的链路透传消息复制给新消息的客户端链路透传
	// 也就是说,将当前服务收到的链路透传进一步传递给下一个服务
    newMsg.clientMetaData = m.serverMetaData.Clone()

	// 一般需要拷贝消息的场景都是因为当前服务也需要 RPC 调用,所以从当前服务收到的消息派生
	// RPC 调用消息的话,需要更正主调信息为自己的服务信息
    newMsg.callerServiceName = m.calleeServiceName
    newMsg.callerApp = m.calleeApp
    newMsg.callerServer = m.calleeServer
    newMsg.callerService = m.calleeService
    newMsg.callerMethod = m.calleeMethod
}

如上,到这里,一个派生的消息就从一个现有消息中拷贝了框架定义的一些上下文信息了,并且也设置了主调信息为自身服务,但是,被调信息呢?你回到开头看 tRPC 的桩代码就会知道了,桩代码里会负责进一步重写被调信息。

抬头看看 c.client.Invoke 方法

了解完消息派生,再来看看 RPC 调用请求的核心过程,c.client 是一个 client.Client 接口类型的对象,考虑到默认的 tRPC 协议请求的话,使用的是框架默认的 client 实现,我们看看它的 Invoke 方法:

代码语言:go
复制
func (c *client) Invoke(ctx context.Context, reqbody interface{}, rspbody interface{}, opt ...Option) error {
    // 此处跳过一些不相关的逻辑

    // start filter chain processing
    return c.getFilters(opts).Filter(contextWithOptions(ctx, opts), reqbody, rspbody, callFunc)
}

Invoke 方法主要初始化请求的 opt 合并以及超时控制,核心是通过过滤器链的 Filter 方法开始逐层处理逻辑,而真正的请求处理逻辑则定义在 callFunc 中:

代码语言:go
复制
func callFunc(ctx context.Context, reqbody interface{}, rspbody interface{}) error {
    msg := codec.Message(ctx)
    // 这里省略一些无关逻辑

    // 这里开始消息编码
    reqbuf, err := prepareRequestBuf(msg, reqbody, opts)
    if err != nil {
        return err
    }

	// 这里开始底层的连接和数据传输
    rspbuf, err := opts.Transport.RoundTrip(ctx, reqbuf, opts.CallOptions...)
    
    var rspbodybuf []byte
    if opts.EnableMultiplexed {
        rspbodybuf = rspbuf
    } else {
		// 消息解码,将收到的二进制解码到标准消息结构
        rspbodybuf, err = opts.Codec.Decode(msg, rspbuf)
        if err != nil {
            return errs.NewFrameError(errs.RetClientDecodeFail, "client codec Decode: "+err.Error())
        }
    }
	// 进一步做反序列化等操作
    return processResponseBuf(msg, rspbody, rspbodybuf, opts)
}

这里涉及几个关键操作,我们下来逐个展开。

prepareRequestBuf 的逻辑

以下是 prepareRequestBuf 的源码:

代码语言:go
复制
func prepareRequestBuf(msg codec.Msg, reqbody interface{}, opts *Options) ([]byte, error) {
	// 序列化和压缩
    reqbodybuf, err := serializeAndCompress(msg, reqbody, opts)
    if err != nil {
        return nil, err
    }
    // 将序列化和压缩后的数据进一步按照传输协议编码
    reqbuf, err := opts.Codec.Encode(msg, reqbodybuf)
    if err != nil {
        return nil, errs.NewFrameError(errs.RetClientEncodeFail, "client codec Encode: "+err.Error())
    }
    return reqbuf, nil
}

序列化和压缩的逻辑在此不展开,因为不涉及我们要讨论的链路透传中的头部的处理,我们看 opts.Codec.Encode 的逻辑,它实现了对消息的编码。一般情况下,我们使用标准的 tRPC 编解码协议的话,使用的是框架默认的编解码器,我们看它的 Encode 方法:

代码语言:go
复制
func (c *ClientCodec) Encode(msg codec.Msg, reqbody []byte) (reqbuf []byte, err error) {
    frameHead := getFrameHead(msg)  // 从消息元数据中获取帧信息,这个取决于具体协议,这里因为是默认实现,所以这里的帧头信息是 tRPC 的默认实现
    if frameHead.FrameType != uint8(TrpcDataFrameType_TRPC_UNARY_FRAME) {
        return c.streamCodec.Encode(msg, reqbody)
    }

	// 检查客户端应用层调用 RPC 时是否显式设置了请求头信息,有则使用其指定的,没有则使用默认头:
	// 	 Version:  uint32(TrpcProtoVersion_TRPC_PROTO_V1)
	//   CallType: uint32(TrpcCallType_TRPC_UNARY_CALL),
    req, err := getRequestHead(msg)
    if err != nil {
        return nil, err
    }
	// 从 msg 中提取请求所需的主被调信息、链路透传信息等
    c.updateReqHead(req, msg)

    // 协议升级到具体通信协议
    upgradeProtocol(frameHead, msg, req.RequestId)

    // 使用 protobuf 对请求头进行编码
    reqhead, err := proto.Marshal(req)
    if err != nil {
        return nil, err
    }
	// 将请求协议头与正文进行组装,形成完整的一次请求的编码
    return frameHead.construct(reqhead, reqbody)
}

这里的编码过程也比较清晰了,我们重点看看 c.updateReqHead(req, msg) 就好:

代码语言:go
复制
func (c *ClientCodec) updateReqHead(req *RequestProtocol, msg codec.Msg) {
    // 此处省略一堆与本文无关的源码
    req.TransInfo = setClientTransInfo(msg, req.TransInfo)
}

这里就是请求的链路透传数据了,setClientTransInfo 负责将 msg 的链路透传信息复制到 req.TransInfo 中:

代码语言:go
复制
func setClientTransInfo(msg codec.Msg, trans map[string][]byte) map[string][]byte {
    // set MetaData
    if len(msg.ClientMetaData()) > 0 {
        if trans == nil {
            trans = make(map[string][]byte)
        }
        for k, v := range msg.ClientMetaData() {
            trans[k] = v
        }
    }
    // 此处省略后续的一些额外的 transInfo 操作
    return trans
}

那么 msgClientMetaData 是怎么来的?还记得开头示例代码里的 client.WithMetaData("requestID", []byte("123456")) 吗?知道了吧,这个问题留给你自己探索一下。

opts.Codec.Decode 的逻辑

opts.Codec.Decode 方法调用发生在 RoundTrip 之后,也就是网络往返完成之后,此时客户端已经收到服务器端返回的完整响应了,我们看看它需要做什么事情:

代码语言:go
复制
func (c *ClientCodec) Decode(msg codec.Msg, rspbuf []byte) (rspbody []byte, err error) {
    // 此处省略一些无关代码

    // 获取响应头信息
    rsp, err := c.getResponseHead(msg)
    if err != nil {
        return nil, err
    }

    // 此处省略一些代码:主要完成对头部信息二进制片段的截取
	
	// 将头部信息进行反序列化
    if err := proto.Unmarshal(rspbuf[begin:end], rsp); err != nil {
        return nil, err
    }

	// 使用响应头信息更
    if err := updateMsg(msg, frameHead, rsp); err != nil {
        return nil, err
    }

    // body decoded
    return rspbuf[end:], nil
}

先看 c.getResponseHead 的代码,它会在应用层有显式设定的情况下返回应用层设定的头部信息的指针:

代码语言:go
复制
func (c *ClientCodec) getResponseHead(msg codec.Msg) (*ResponseProtocol, error) {
    if msg.ClientRspHead() != nil {
        // 这里不为空的话,意味着应用层执行了 client.WithRspHead(head) 设置了响应头,这种情况下就用
		// 应用层指定的指针反序列化头部,这样应用层就可以获取到响应头部了,应用层可以进一步使用 head.TransInfo 获取链路透传信息
        rsp, ok := msg.ClientRspHead().(*ResponseProtocol)
        if !ok {
            return nil, errors.New("client decode rsp head type invalid, must be trpc response protocol head")
        }
        return rsp, nil
    }
    
	// 没有指定的情况下,用默认的新的头部信息指针
    rsp := &ResponseProtocol{}
    msg.WithClientRspHead(rsp)
    return rsp, nil
}

所以,我们在文章开头示例代码中通过 client.WithRspHead(head) 设定的 head 会在此时被 getResponseHead 返回

接下来看 updateMsg(msg, frameHead, rsp)

代码语言:go
复制
func updateMsg(msg codec.Msg, frameHead *FrameHead, rsp *ResponseProtocol) error {
    // 此处省略少量无关代码

    // 这里将被调服务端透传回来的信息合并到 ClientMetaData 中
    if len(rsp.TransInfo) > 0 {
        md := msg.ClientMetaData()
        if len(md) == 0 {
            md = codec.MetaData{}
        }
        for k, v := range rsp.TransInfo {
            md[k] = v
        }
        msg.WithClientMetaData(md)
    }

    // 此处省略一些无关代码
}

可以看到,这里其中的一个有意思的事情是会将服务端链路透传信息合并到请求时的消息的 ClientMetaData 中,如果你能访问到这个消息的指针,也就能够有第二种方式获取到服务端返回的链路透传信息了。但是显然你在应用层是不能的,为什么呢?因为请求使用的 msg 是在桩代码中派生的,不是你应用层提供的。所以,应用层想要获取链路回传信息,只有官方文档上的那一条路。但是为什么我要提有第二种可能性呢?那是因为你可以在 filter 里访问到这个 message,怎么访问?想想 trpc.Message(ctx)

总结

到这里,涉及链路往返透传的相关源码就剖析完整了,用一个流程图结束本文:

链路透传整体过程
链路透传整体过程

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
    • 说明
    • 源码走读
      • 先从应用层调用讲起
        • 从 tRPC-go 桩代码着手
          • codec.WithCloneMessage 的作用
            • 抬头看看 c.client.Invoke 方法
              • prepareRequestBuf 的逻辑
              • opts.Codec.Decode 的逻辑
          • 总结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档