前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:grpc 链接池(2)

golang源码分析:grpc 链接池(2)

作者头像
golangLeetcode
发布2023-03-01 16:18:29
5470
发布2023-03-01 16:18:29
举报

继续上一篇golang源码分析:grpc 链接池(1),我们从源码来分析,我们将从连接池的建立,请求发起的时候获取连接,以及最终关闭连接三个流程进行源码分析。

1,创建连接的过程

源码入口位于google.golang.org/grpc@v1.46.0/clientconn.go

代码语言:javascript
复制
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  return DialContext(context.Background(), target, opts...)
}
代码语言:javascript
复制
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
      cc := &ClientConn{
    target:            target,
    csMgr:             &connectivityStateManager{},
    conns:             make(map[*addrConn]struct{}),
    dopts:             defaultDialOptions(),
    blockingpicker:    newPickerWrapper(),
    czData:            new(channelzData),
    firstResolveEvent: grpcsync.NewEvent(),
  }
  resolverBuilder, err := cc.parseTargetAndFindResolver()
  cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
    DialCreds:        credsClone,
    CredsBundle:      cc.dopts.copts.CredsBundle,
    Dialer:           cc.dopts.copts.Dialer,
    Authority:        cc.authority,
    CustomUserAgent:  cc.dopts.copts.UserAgent,
    ChannelzParentID: cc.channelzID,
    Target:           cc.parsedTarget,
  })
  rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  if cc.dopts.block {
    for {
      cc.Connect()
      }
  }

首先,获取域名解析器的构造器,比如拿到获取自定义的dns解析器;然后得到负载均衡器cc.balancerWrapper;最后得到解析器。

google.golang.org/grpc@v1.46.0/resolver_conn_wrapper.go

代码语言:javascript
复制
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error)
        ccr := &ccResolverWrapper{
    cc:   cc,
    done: grpcsync.NewEvent(),
  }
      ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)

如果我们在Dial的时候指定选项grpc.WithBlock(),就会在for循环里进行尝试连接,否则返回连接器,在请求到来的时候才进行真正的连接。接函数的定义如下,将当前连接退出idle状态:

代码语言:javascript
复制
func (cc *ClientConn) Connect() {
  cc.balancerWrapper.exitIdle()
}

连接的状态定义在google.golang.org/grpc@v1.46.0/connectivity/connectivity.go,可以看到有6个状态:

代码语言:javascript
复制
func (s State) String() string {
  switch s {
  case Idle:
    return "IDLE"
  case Connecting:
    return "CONNECTING"
  case Ready:
    return "READY"
  case TransientFailure:
    return "TRANSIENT_FAILURE"
  case Shutdown:
    return "SHUTDOWN"
  default:
    logger.Errorf("unknown connectivity state: %d", s)
    return "INVALID_STATE"
  }
}

在banlancer里会启动一个协程watch,服务端连接状态的变化

代码语言:javascript
复制
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper
      ccb := &ccBalancerWrapper{
    cc:       cc,
    updateCh: buffer.NewUnbounded(),
    resultCh: buffer.NewUnbounded(),
    closed:   grpcsync.NewEvent(),
    done:     grpcsync.NewEvent(),
  }
    go ccb.watcher()
    ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)

根据状态channel里的类型执行响应的操作分发,代码位于google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go

代码语言:javascript
复制
type ccBalancerWrapper struct {
  cc *ClientConn
  // Since these fields are accessed only from handleXxx() methods which are
  // synchronized by the watcher goroutine, we do not need a mutex to protect
  // these fields.
  balancer        *gracefulswitch.Balancer
  curBalancerName string
  
  updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().
  resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.
  closed   *grpcsync.Event   // Indicates if close has been called.
  done     *grpcsync.Event   // Indicates if close has completed its work.
}

里面有两个channel分别是处理更新时间和执行结果

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) watcher() {
          case u := <-ccb.updateCh.Get():
      ccb.updateCh.Load()
      if ccb.closed.HasFired() {
        break
      }
      switch update := u.(type) {
      case *ccStateUpdate:
        ccb.handleClientConnStateChange(update.ccs)
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
  ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
}
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
      ccb.updateCh.Put(&scStateUpdate{

回过头来,我们就可以看到,我们发起连接的时候,exitIdle具体干了什么,就是往channel里发送更新的事件:

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) exitIdle() {
  ccb.updateCh.Put(&exitIdleUpdate{})
}

对应的处理事件如下:如果状态是Idle,执行ExitIdle,退出空闲状态

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) handleExitIdle() {
  if ccb.cc.GetState() != connectivity.Idle {
    return
  }
  ccb.balancer.ExitIdle()
}

channel的类型是一个无限长的队列,每次处理第一个,如果来不及处理,就放在对接末尾,类似nginx的backlog队列:

google.golang.org/grpc@v1.46.0/internal/buffer/unbounded.go

代码语言:javascript
复制
type Unbounded struct {
  c       chan interface{}
  mu      sync.Mutex
  backlog []interface{}
}
代码语言:javascript
复制
func (b *Unbounded) Put(t interface{}) {
        if len(b.backlog) == 0 {
    select {
    case b.c <- t:
      b.backlog = append(b.backlog, t)
代码语言:javascript
复制
func (b *Unbounded) Load() {
        if len(b.backlog) > 0 {
    select {
    case b.c <- b.backlog[0]:
      b.backlog[0] = nil
      b.backlog = b.backlog[1:]
代码语言:javascript
复制
func (b *Unbounded) Get() <-chan interface{} {

分析完队列后,我们看下ExitIdle的内容是什么,源码位于:

google.golang.org/grpc@v1.46.0/internal/balancer/gracefulswitch/gracefulswitch.go

代码语言:javascript
复制
func (gsb *Balancer) ExitIdle() {
        for sc := range balToUpdate.subconns {
          sc.Connect()
      }

对每个subConn都尝试着去进行连接,这里才是发起服务端连接的真正地方。subconns保存在balancerWrapper里。

代码语言:javascript
复制
type balancerWrapper struct {
  balancer.Balancer
  gsb *Balancer

  lastState balancer.State
  subconns  map[balancer.SubConn]bool // subconns created by this balancer
}

当需要更新连接状态的时候,根据传入的状态,发送对应的消息,进行连接状态的更新:

代码语言:javascript
复制
func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
      if state.ConnectivityState == connectivity.Shutdown {
        delete(bw.subconns, sc)
      bw.Balancer.UpdateSubConnState(sc, state)

真正执行状态更新的函数是:

代码语言:javascript
复制
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
        if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
    balToUpdate = gsb.balancerCurrent
  } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
    balToUpdate = gsb.balancerPending
  }
      balToUpdate.UpdateSubConnState(sc, state)

SubConn的定义位于google.golang.org/grpc@v1.46.0/balancer/balancer.go

代码语言:javascript
复制
type SubConn interface {
  // UpdateAddresses updates the addresses used in this SubConn.
  // gRPC checks if currently-connected address is still in the new list.
  // If it's in the list, the connection will be kept.
  // If it's not in the list, the connection will gracefully closed, and
  // a new connection will be created.
  //
  // This will trigger a state transition for the SubConn.
  //
  // Deprecated: This method is now part of the ClientConn interface and will
  // eventually be removed from here.
  UpdateAddresses([]resolver.Address)
  // Connect starts the connecting for this SubConn.
  Connect()
}

这里定义了Picker接口,用来从连接池中选择一个可用连接

代码语言:javascript
复制
type Picker interface {
  // Pick returns the connection to use for this RPC and related information.
  //
  // Pick should not block.  If the balancer needs to do I/O or any blocking
  // or time-consuming work to service this call, it should return
  // ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
  // the Picker is updated (using ClientConn.UpdateState).
  //
  // If an error is returned:
  //
  // - If the error is ErrNoSubConnAvailable, gRPC will block until a new
  //   Picker is provided by the balancer (using ClientConn.UpdateState).
  //
  // - If the error is a status error (implemented by the grpc/status
  //   package), gRPC will terminate the RPC with the code and message
  //   provided.
  //
  // - For all other errors, wait for ready RPCs will wait, but non-wait for
  //   ready RPCs will be terminated with this error's Error() string and
  //   status code Unavailable.
  Pick(info PickInfo) (PickResult, error)
}

并且定义了Banlancer的接口,用户可以自定义banlancer实现这个接口,包括UpdateClientConnState和 UpdateSubConnState

代码语言:javascript
复制
type Balancer interface {
  // UpdateClientConnState is called by gRPC when the state of the ClientConn
  // changes.  If the error returned is ErrBadResolverState, the ClientConn
  // will begin calling ResolveNow on the active name resolver with
  // exponential backoff until a subsequent call to UpdateClientConnState
  // returns a nil error.  Any other errors are currently ignored.
  UpdateClientConnState(ClientConnState) error
  // ResolverError is called by gRPC when the name resolver reports an error.
  ResolverError(error)
  // UpdateSubConnState is called by gRPC when the state of a SubConn
  // changes.
  UpdateSubConnState(SubConn, SubConnState)
  // Close closes the balancer. The balancer is not required to call
  // ClientConn.RemoveSubConn for its existing SubConns.
  Close()
}

官方包里给了很多种banlancer的实现,比如ringhash

google.golang.org/grpc@v1.46.0/xds/internal/balancer/ringhash/ringhash.go

代码语言:javascript
复制
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
  b := &ringhashBalancer{
    cc:       cc,
    subConns: make(map[resolver.Address]*subConn),
    scStates: make(map[balancer.SubConn]*subConn),
    csEvltr:  &connectivityStateEvaluator{},
  }

2,用户发起客户端请求的时候的调用过程

源码入口位于:google.golang.org/grpc@v1.46.0/call.go

代码语言:javascript
复制
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {

      return invoke(ctx, method, args, reply, cc, opts...)

它会创建一个clientStream然后发送和接收消息:

代码语言:javascript
复制
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
      cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)

google.golang.org/grpc@v1.46.0/stream.go

代码语言:javascript
复制
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
        var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
         return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
  }
代码语言:javascript
复制
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
        if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
    cs.finish(err)
    return nil, err
  }
代码语言:javascript
复制
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
      t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)

google.golang.org/grpc@v1.46.0/clientconn.go,从banlancer中pick一个链接

代码语言:javascript
复制
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error)
        t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
    Ctx:            ctx,
    FullMethodName: method,
  })

google.golang.org/grpc@v1.46.0/picker_wrapper.go,通过wrapper在用户自定义balancer里面的picke方法,获取连接

代码语言:javascript
复制
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
      pickResult, err := p.Pick(info)
      acw, ok := pickResult.SubConn.(*acBalancerWrapper)
      if t := acw.getAddrConn().getReadyTransport(); t != nil {

3,关闭连接的过程

源码入口位于:google.golang.org/grpc@v1.46.0/clientconn.go

代码语言:javascript
复制
func (cc *ClientConn) Close() error {
      cc.csMgr.updateState(connectivity.Shutdown)
   for ac := range conns {
    ac.tearDown(ErrClientConnClosing)
  }

依次关闭连接池中的所有连接。

代码语言:javascript
复制
func (ac *addrConn) tearDown(err error) {

      tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct

// will leak. In most cases, call cc.removeAddrConn() instead.

      ac.updateConnectivityState(connectivity.Shutdown, nil)
代码语言:javascript
复制
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
      ac.state = s
      ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)

它也是通过更细状态的方式来影响连接池状态机的

代码语言:javascript
复制
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
  cc.balancerWrapper.updateSubConnState(sc, s, err)
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档