前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:grpc 链接池(3)resolver 、balancer和picker

golang源码分析:grpc 链接池(3)resolver 、balancer和picker

作者头像
golangLeetcode
发布2023-03-01 16:18:49
1.1K0
发布2023-03-01 16:18:49
举报
文章被收录于专栏:golang算法架构leetcode技术php

在分析完grpc连接的创建、使用和销毁过程后golang源码分析:grpc 链接池(2),我们来分析下grpc留给我们的编程扩展接口resolver 、balancer和picker是如何嵌入grpc连接池的。

总的来说:每个 ClientConn 对应有多个 SubConn,ClientConn 会基于名字发现(resolver)得到多个 SubConn,并面向多个 SubConn 之间实现负载均衡(balancer),每次客户端请求的时候根据picker提供的Pick接口,从连接池中选择一个SubConn来完成请求。resolver 与 balancer 都是抽象的,内建的 resolver 包括 dns、manual、passthrough,内建的 balancer 包括 roundrobin、grpclb。当然也可以基于插件化的 Register 模式来在模块自身的 init() 函数中将自己注册。

1,resolver

代码语言:javascript
复制
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}

当我们调用Dial获取连接池的时候,首先是获取resolver,通过解析target,获得schema,然后通过schema在全局注册表中找到对应的resolver,需要注意的是,我们在自定义resolver的时候引用的grpc版本一定要和发起连接的时候的grpc版本一致,否则会出现resolver找不到使用默认的passthrough的情况,这是踩坑的血泪记忆。获取resolver的源码定义在google.golang.org/grpc@v1.50.1/clientconn.go

代码语言:javascript
复制
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  resolverBuilder, err := cc.parseTargetAndFindResolver()
  rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  cc.resolverWrapper = rWrapper
代码语言:javascript
复制
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
rb = cc.getResolver(parsedTarget.Scheme)
代码语言:javascript
复制
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
return resolver.Get(scheme)

和我们注册的过程是对应的

代码语言:javascript
复制
func init() {
  resolver.Register(&mockResolverBuilder{})
 }

当然在ClientConn中使用的时候都是经过装饰器包裹了一层的google.golang.org/grpc@v1.50.1/resolver_conn_wrapper.go,它会调用建造器的Build接口:

代码语言:javascript
复制
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
  ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)

那么什么时候调用我们定义的resolver的ResolveNow 接口呢?在创建连接的时候:

代码语言:javascript
复制
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
代码语言:javascript
复制
func (ac *addrConn) resetTransport() {
  if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
  ac.cc.resolveNow(resolver.ResolveNowOptions{})

它通过一个协程调用了resolverWrapper对应的方法

代码语言:javascript
复制
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
  r := cc.resolverWrapper
  go r.resolveNow(o)

实现位于google.golang.org/grpc@v1.50.1/resolver_conn_wrapper.go

代码语言:javascript
复制
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  ccr.resolverMu.Lock()
  if !ccr.done.HasFired() {
  ccr.resolver.ResolveNow(o)

如果状态更新,会调用UpdateState

代码语言:javascript
复制
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
}

google.golang.org/grpc@v1.50.1/clientconn.go

代码语言:javascript
复制
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
  cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
  bw := cc.balancerWrapper
  uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

2,balancer

balancer的注册过程和resolver的过程一样,只不过使用的的时候不是通过target的schema来加载的,而是通过 grpc.WithDefaultServiceConfig选项实现的

代码语言:javascript
复制
 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name)

google.golang.org/grpc@v1.50.1/balancer/balancer.go balancer的核心接口是UpdateClientConnState和UpdateSubConnState,它传入的对象是ClientConnState,里面保存了ResolverState,也就是resolver的解析结果

代码语言:javascript
复制
type Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.  If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error.  Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
代码语言:javascript
复制
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}

接口当然也是被装饰器包裹者google.golang.org/grpc@v1.50.1/balancer_conn_wrappers.go,它会启动一个监视器,当连接状态发生变化的时候,会调用对应事件处理函数处理,事件生成是时候并不是同步处理,而是先发送到channel里面

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) watcher() {
  for {
  select {
    case u := <-ccb.updateCh.Get():
    switch update := u.(type) {
    case *ccStateUpdate:
       ccb.handleClientConnStateChange(update.ccs)
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
  ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
  select {
    case res = <-ccb.resultCh.Get():
    ccb.resultCh.Load()
代码语言:javascript
复制
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
   ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))

最终事件是交给balancer的UpdateClientConnState处理了,在basebalancer里面也实现了这个接口,这里会遍历ResolverState.Addresses的地址列表,然后发起连接,也就是建立连接池的初始子连接。并且生产picker

google.golang.org/grpc@v1.50.1/balancer/base/balancer.go

代码语言:javascript
复制
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
  for _, a := range s.ResolverState.Addresses {
     addrsSet.Set(a, nil)
     if _, ok := b.subConns.Get(a); !ok {
      sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
      sc.Connect()
  b.regeneratePicker()
  b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})

basebalancer也实现了另外一个接口,思路一样,只不过处理的是连接池里的子连接:

代码语言:javascript
复制
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
  b.state == connectivity.TransientFailure {
  b.regeneratePicker()
  }
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})

他们调用的UpdateState位于google.golang.org/grpc@v1.50.1/balancer_conn_wrappers.go,会更新picker

代码语言:javascript
复制
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
  ccb.cc.blockingpicker.updatePicker(s.Picker)
  ccb.cc.csMgr.updateState(s.ConnectivityState)

3,picker

我们定义picker的时候这册思路也一样,需要实现builder

代码语言:javascript
复制
func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {

它的参数定义位于google.golang.org/grpc@v1.50.1/balancer/base/base.go

代码语言:javascript
复制
type PickerBuildInfo struct {
// ReadySCs is a map from all ready SubConns to the Addresses used to
// create them.
ReadySCs map[balancer.SubConn]SubConnInfo
}

注意这里面的map,包含了已经建立的连接,picker的实现,只需要定义自己的选择算法,从中选择合适的连接供Invoke使用。这个map是什么时候生成的呢,我们看下picker的实例化逻辑

google.golang.org/grpc@v1.50.1/balancer/base/balancer.go

代码语言:javascript
复制
func (b *baseBalancer) regeneratePicker() {
  for _, addr := range b.subConns.Keys() {
     sci, _ := b.subConns.Get(addr)
      if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
      readySCs[sc] = SubConnInfo{Address: addr}
      }
  }
  b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})

可以看到,它通过地址获取连接,选取ready的连接,放入到这个map,最后用这个map初始化了picker。而地址的来源正是resolver解析得到的,它保存在:

代码语言:javascript
复制
// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}

google.golang.org/grpc@v1.50.1/internal/balancer/gracefulswitch/gracefulswitch.go

代码语言:javascript
复制
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
return balToUpdate.UpdateClientConnState(state)

当Invoke的时候,会先调用装饰器的pick方法

google.golang.org/grpc@v1.50.1/clientconn.go

代码语言:javascript
复制
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx:            ctx,
FullMethodName: method,
})
}

google.golang.org/grpc@v1.50.1/picker_wrapper.go

代码语言:javascript
复制
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
   pickResult, err := p.Pick(info)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档