前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:grpc context

golang源码分析:grpc context

作者头像
golangLeetcode
发布2022-08-03 13:53:19
8970
发布2022-08-03 13:53:19
举报
文章被收录于专栏:golang算法架构leetcode技术php

gRPC 是基于 HTTP/2 协议的。进程间传输定义了一个 metadata 对象,该对象放在 Request-Headers 内,所以通过 metadata 我们可以将上一个进程中的全局对象透传到下一个被调用的进程。

代码语言:javascript
复制
type MD map[string][]string

进程内部我们通过context来传输上下文数据,进程间传递MD的时候,我们也可以从ctx,取出来,进行传递

代码语言:javascript
复制
//set 数据到 metadata
md := metadata.Pairs("key", "val")
// 新建一个有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)

为什么不直接把context里面的数据全取出来,传递给下游呢?这是出于可维护性和安全性两方面的考虑,如果将ctx所有信息都传递下去,很有可能将一些内部信息泄漏,另一方面,下游在取ctx的时候,不知道到底传了哪些数据。所以grpc定义了两个context:

代码语言:javascript
复制
OutgoingContext
IncomingContext

OutgoingContext用于发送请求一方,包装下游依赖的数据,传递出去。IncomingContext用于服务端接受,客户端传递来的context信息。context中间通过序列化成http2 header的方式进行传输。metadata/metadata.go,我们可以看到这两个context虽然也是通过context.WithValue 设置数据,通过context.Value来读取数据。

代码语言:javascript
复制
type mdIncomingKey struct{}
type mdOutgoingKey struct{}

// NewIncomingContext creates a new context with incoming md attached.
func NewIncomingContext(ctx context.Context, md MD) context.Context {
  return context.WithValue(ctx, mdIncomingKey{}, md)
}

// NewOutgoingContext creates a new context with outgoing md attached. If used
// in conjunction with AppendToOutgoingContext, NewOutgoingContext will
// overwrite any previously-appended metadata.
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
  return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
代码语言:javascript
复制
func FromIncomingContext(ctx context.Context) (MD, bool) {
  md, ok := ctx.Value(mdIncomingKey{}).(MD)
  if !ok {
    return nil, false
  }
  out := MD{}
  for k, v := range md {
    // We need to manually convert all keys to lower case, because MD is a
    // map, and there's no guarantee that the MD attached to the context is
    // created using our helper functions.
    key := strings.ToLower(k)
    out[key] = v
  }
  return out, true
}
func FromOutgoingContext(ctx context.Context) (MD, bool) {
  raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
  if !ok {
    return nil, false
  }

  out := MD{}
  for k, v := range raw.md {
    // We need to manually convert all keys to lower case, because MD is a
    // map, and there's no guarantee that the MD attached to the context is
    // created using our helper functions.
    key := strings.ToLower(k)
    out[key] = v
  }
  for _, added := range raw.added {
    if len(added)%2 == 1 {
      panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added)))
    }

    for i := 0; i < len(added); i += 2 {
      key := strings.ToLower(added[i])
      out[key] = append(out[key], added[i+1])
    }
  }
  return out, ok
}

但是,和普通context也是有差别的,MD的存储的时候,key 是string,value是[]string,context为了尽可能地防止覆盖,key 、value都是interface类型的,并且通过lint等方式,尽可能做到不让修改,也就是说用户自己存入的数据的key尽量要是新定义的类型,类型别名也不可以。

直观理解,客户端在发送请求的时候,会初始化一个OutgoingContext,服务端在取的时候,用的是IncomingContext,中间必然存在一个从OutgoingContext 取数据,方让http2 header,从http2 header 取数据存入IncomingContext 的过程。我们通过源码来分析下:

1,server端构造IncomingContext 的过程:

代码语言:javascript
复制
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) 
        s.serveStreams(st)

我们从server.go文件ServeHTTP 函数开始:

代码语言:javascript
复制
func (s *Server) serveStreams(st transport.ServerTransport) 
        st.HandleStreams(func(stream *transport.Stream) 

它调用了internal/transport/http2_server.go里面的函数

代码语言:javascript
复制
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) 
     case *http2.MetaHeadersFrame:
       if t.operateHeaders(frame, handle, traceCtx) {
代码语言:javascript
复制
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool)
   ctx = metadata.NewIncomingContext(ctx, ht.headerMD)

可以看到,通过http2的header构造了我们的IncomingContext

2,client从 OutgoingContext取数据的过程

客户端的请求调用是从call.go的Invoke函数开始的

代码语言:javascript
复制
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error  
代码语言:javascript
复制
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error 
        cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
         err := cs.SendMsg(req); err != nil 
代码语言:javascript
复制
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption)
        return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
          op := func(a *csAttempt) error { return a.newStream() }
            s, err := a.t.NewStream(cs.ctx, cs.callHdr)

最终调用啦a.t.NewStream

实现在internal/transport/http2_client.go

代码语言:javascript
复制
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error)
     headerFields, err := t.createHeaderFields(ctx, callHdr) 
代码语言:javascript
复制
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error)
     md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok 

至此,完成了,数据的转换。那么问题来了,对于一个处于中游的grpc服务,每个请求,我都去先获取IncomingContext然后设置OutgoingContext是不是很麻烦我们有没有相关的简单方案呢?答案是middleware

3,客户端middleware

在客户端发起的请求连接的时候,我们可以在options里面添加拦截器unaryClientInterceptors

代码语言:javascript
复制
    conn, err := grpc.Dial(target, dialOptions...)
    dialOptions := append([]grpc.DialOption{
      grpc.WithUnaryInterceptor(grpcMiddleware.ChainUnaryClient(unaryClientInterceptors...)),

客户端的拦截器有很多,比如:

代码语言:javascript
复制
clientinterceptors.UnaryTracingInterceptor,
clientinterceptors.DurationInterceptor,
clientinterceptors.PrometheusInterceptor,
clientinterceptors.BreakerInterceptor,
clientinterceptors.TimeoutInterceptor(cliOpts.Timeout),

一个常见的客户端拦截器可以这么写,拦截器的入参有我们需要的一切:

代码语言:javascript
复制
func DurationInterceptor(ctx context.Context, method string, req, reply interface{},
  cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   //do some thing
  err := invoker(ctx, method, req, reply, cc, opts...)

4,服务端middleware

我们在注册服务的时候,可以注册unaryServerInterceptors

代码语言:javascript
复制
server = grpc.NewServer(dialOptions...)
    dialOptions := []grpc.ServerOption{
      grpc_middleware.WithUnaryServerChain(unaryServerInterceptors...),

常见的服务端拦截器长这样:

代码语言:javascript
复制
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
  o := evaluateOptions(opts)
  return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
  //do some thing
  }
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档