前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:grpc 链接池(5)自定义组件和框架交互流程

golang源码分析:grpc 链接池(5)自定义组件和框架交互流程

作者头像
golangLeetcode
发布2023-03-01 16:19:43
4460
发布2023-03-01 16:19:43
举报

自定义grpc组件是如何与框架交互的呢?

1,resolver

首先我们看下resolvergolang源码分析:grpc 链接池(4)自定义resolver 、balancer和picker相关的最核心接口,在生成resolver前我们先定义对应的builder,它对应函数Build的参数是ccresolver.ClientConn,它调用服务发现组件获取服务对应地址后,就是通过cc的UpdateState方法,把地址存入连接池中,供后面的balancer来使用的。

代码语言:javascript
复制
func (*mockResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
      r.start()
      r.cc.UpdateState(resolver.State{Addresses: addrs})

那Build方法是何时调用的呢,当然是Dial初始化连接池时调用的newCCResolverWrapper源码位于:google.golang.org/grpc@v1.46.0/resolver_conn_wrapper.go

代码语言:javascript
复制
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
      rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
      if cc.dopts.block {
              cc.Connect()
      s := cc.GetState()
      if s == connectivity.Ready {
      cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
      rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
      cc.resolverWrapper = rWrapper
代码语言:javascript
复制
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
    ccr := &ccResolverWrapper{
    cc:   cc,
    done: grpcsync.NewEvent(),
  }
  ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
代码语言:javascript
复制
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
      if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {

对应的update方法会将解析结果发送到解析事件通知channel里来进行分发,在这里就和balancer关联上了,将resolve的结果放在参数里传递下去:

代码语言:javascript
复制
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
      bw := cc.balancerWrapper
      uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

代码位于:google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
      ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
代码语言:javascript
复制
type ccStateUpdate struct {
  ccs *balancer.ClientConnState
}

对应的有个后台协程一直在监听更新事件,做分发处理

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) watcher() {
      case u := <-ccb.updateCh.Get():
              case *ccStateUpdate:
        ccb.handleClientConnStateChange(update.ccs)
      case *scStateUpdate:
        ccb.handleSubConnStateChange(update)
      case *exitIdleUpdate:
        ccb.handleExitIdle()
      case *resolverErrorUpdate:
        ccb.handleResolverError(update.err)
      case *switchToUpdate:
        ccb.handleSwitchTo(update.name)
      case *subConnUpdate:
        ccb.handleRemoveSubConn(update.acbw)

类似的命令事件还用很多,比如我们发起连接的时候

代码语言:javascript
复制
    func (cc *ClientConn) Connect() {
      cc.balancerWrapper.exitIdle()
代码语言:javascript
复制
    func (ccb *ccBalancerWrapper) exitIdle() {
      ccb.updateCh.Put(&exitIdleUpdate{})
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) switchTo(name string) {
      ccb.updateCh.Put(&switchToUpdate{name: name})
func (ccb *ccBalancerWrapper) resolverError(err error) {
  ccb.updateCh.Put(&resolverErrorUpdate{err: err})
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
    ccb.updateCh.Put(&scStateUpdate{
    sc:    sc,
    state: s,
    err:   err,
  }

分发后由对应事件处理函数处理

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
      ccs.ResolverState.Addresses = addrs
      ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
      ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
      builder := balancer.Get(name)
      if err := ccb.balancer.SwitchTo(builder); err != nil {
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
      ac, err := ccb.cc.newAddrConn(addrs, opts)
      acbw := &acBalancerWrapper{ac: ac}
      ac.acbw = acbw

2,balancer

在注册balancer的时候,其实是服用了base的建造者:

代码语言:javascript
复制
balancer.Register( base.NewBalancerBuilder
代码语言:javascript
复制
return base.NewBalancerBuilder(Name, picker.NewRandomPickerBuilder(), base.Config{HealthCheck: true})

google.golang.org/grpc@v1.50.1/balancer/base/base.go

代码语言:javascript
复制
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {

注意到,参数pb PickerBuilder是picker的建造器,也是通过它关联的resolver和picker。google.golang.org/grpc@v1.50.1/balancer/base/balancer.go

代码语言:javascript
复制
type baseBuilder struct {
  name          string
  pickerBuilder PickerBuilder
  config        Config
}

在Build方法里初始化baseBalancer

代码语言:javascript
复制
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
    bal := &baseBalancer{
    cc:            cc,
    pickerBuilder: bb.pickerBuilder,


    subConns: resolver.NewAddressMap(),
    scStates: make(map[balancer.SubConn]connectivity.State),
    csEvltr:  &balancer.ConnectivityStateEvaluator{},
    config:   bb.config,
    state:    connectivity.Connecting,
  }

当子连接状态变化的时候,如果状态是空闲,就会发起连接,是关闭的话,把子连接删除,也就是在这里做了连接状态更新事件的分发:

代码语言:javascript
复制
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  case connectivity.Idle:
    sc.Connect()
  case connectivity.Shutdown:
    // When an address was removed by resolver, b called RemoveSubConn but
    // kept the sc's state in scStates. Remove state for this sc here.
    delete(b.scStates, sc)
  case connectivity.TransientFailure:
代码语言:javascript
复制
 func (gsb *Balancer) ExitIdle() {
    for sc := range balToUpdate.subconns {
       sc.Connect()

连接创建在下面这个函数里:

代码语言:javascript
复制
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
      for _, a := range s.ResolverState.Addresses {
        if _, ok := b.subConns.Get(a); !ok {
          sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
          sc.Connect()

切换balancer定义在ClientConn里,这个时候会调用build方法

代码语言:javascript
复制
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
      cc.balancerWrapper.switchTo(newBalancerName)
代码语言:javascript
复制
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
    bw := &balancerWrapper{
    gsb: gsb,

    lastState: balancer.State{
      ConnectivityState: connectivity.Connecting,
      Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
    },
    subconns: make(map[balancer.SubConn]bool),
  }
      newBalancer := builder.Build(bw, gsb.bOpts)
      bw.Balancer = newBalancer

3,picker

picker的核心方法是Pick

代码语言:javascript
复制
func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
代码语言:javascript
复制
func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {

Pick方法是在Invoke的时候调用的,它的链路如下:

代码语言:javascript
复制
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
      if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
代码语言:javascript
复制
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
      t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
代码语言:javascript
复制
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
    t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
    Ctx:            ctx,
    FullMethodName: method,
  })

最终返回一个可用连接:google.golang.org/grpc@v1.46.0/picker_wrapper.go

代码语言:javascript
复制
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
      p := pw.picker
      pickResult, err := p.Pick(info)
      acw, ok := pickResult.SubConn.(*acBalancerWrapper)
      if t := acw.getAddrConn().getReadyTransport(); t != nil {
func (ac *addrConn) getReadyTransport() transport.ClientTransport {
      if ac.state == connectivity.Ready {
    return ac.transport

4,subConn

为了将实现和抽象分离,每个可以供我们自定义的编程接口都有对应的wraper,子连接也不例外,它会调用addConn的wraper

代码语言:javascript
复制
 func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
      return acbw.ac
代码语言:javascript
复制
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
代码语言:javascript
复制
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
      ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)

切换balancer会重新连接新balancer

代码语言:javascript
复制
func (ac *addrConn) connect() error {
      ac.updateConnectivityState(connectivity.Connecting, nil)
      ac.resetTransport()
代码语言:javascript
复制
func (ac *addrConn) resetTransport() {
      if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {

尝试连接所有endpoints

代码语言:javascript
复制
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
      for _, addr := range addrs {
        err := ac.createTransport(addr, copts, connectDeadline)
代码语言:javascript
复制
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
        newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)

其中NewClientTransport,是可以自定义的,所以也有对应wraper

google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go

代码语言:javascript
复制
func (acbw *acBalancerWrapper) Connect() {
      go acbw.ac.connect()
代码语言:javascript
复制
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
        newAC, err := cc.newAddrConn(addrs, opts)
        acbw.ac = newAC
          if acState != connectivity.Idle {
      go newAC.connect()
    }

连接关闭的时候会关闭所有子连接:

google.golang.org/grpc@v1.46.0/internal/balancer/gracefulswitch/gracefulswitch.go

代码语言:javascript
复制
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
    if state.ConnectivityState == connectivity.Shutdown {
    bw.gsb.mu.Lock()
    delete(bw.subconns, sc)
     balToUpdate.UpdateSubConnState(sc, state)
代码语言:javascript
复制
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
      balToUpdate := gsb.latestBalancer()
      return balToUpdate.UpdateClientConnState(state)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档