前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:grpc 链接池(4)自定义resolver 、balancer和picker

golang源码分析:grpc 链接池(4)自定义resolver 、balancer和picker

作者头像
golangLeetcode
发布2023-03-01 16:19:14
8790
发布2023-03-01 16:19:14
举报
文章被收录于专栏:golang算法架构leetcode技术php

在分析完源码后golang源码分析:grpc 链接池(3)resolver 、balancer和picker,我们尝试自定义实现相应的插件。grpc 通过服务发现或者直连形式获取到 gRPC server 的实例的 endpoints,然后通知负载均衡器进行 SubConn 更新,对于新加入的 endpoint 进行实例创建,移出废弃的 endpoint, 最后通过状态更新将状态为 Idle 的 SubConn 进行管理,gRPC 在调用 Invoke时,则会通过负载均衡器中的 Picker 去按照某一个负载均衡算法选择一个 SubConn 创建链接,如果创建成功则不再进行其他 SubConn 的尝试,否则会按照一定的退避算法进行重试,直到退避失败或者创建链接成功为止。上述三个组件的功能分别如下:

  • resolver:通过直连、本地配置,或者从服务发现后台,比如k8s、nacos、etcd,consul等存储介质,获取target对应的endpoint列表。
  • balancer:管理连接池的SubConn,创建对应的picker
  • picker:从 SubConn 列表中按照负载均衡算法选择一个 SubConn 创建链接

下面我们通过这样一个实例来分别实现上述组件,并测试正确性。

后台服务有三个endpoint:127.0.0.1:9080,127.0.0.1:9081,127.0.0.1:9082客户端可以随机从三个endpoint中选择 一个来发送请求

代码语言:javascript
复制
package myresolver

import (
  "fmt"
  "strconv"

  // "github.com/golang-leetcode/grpc-go/resolver"
  //引用包不对,导致注册失败
  "google.golang.org/grpc/resolver"
)

const MockResolverScheme = "mock"

type mockResolverBuilder struct {
}

type mockResolver struct {
  target resolver.Target
  cc     resolver.ClientConn
}

func (*mockResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  r := &mockResolver{
    target: target,
    cc:     cc,
  }
  fmt.Println("target:", target, target.Endpoint, target.URL)
  switch target.Endpoint {
  case "myMock":
    fmt.Println("myMock resolver")
  }
  r.start()
  return r, nil
}

func (*mockResolverBuilder) Scheme() string {
  fmt.Println("get mock scheme")
  return MockResolverScheme
}

func (r mockResolver) start() {
  addrs := make([]resolver.Address, 0)
  for i := 0; i < 3; i++ {
    addrs = append(addrs, resolver.Address{Addr: "127.0.0.1:908" + strconv.FormatInt(int64(i), 10)})
  }
  fmt.Println("addrs", addrs)
  r.cc.UpdateState(resolver.State{Addresses: addrs})
}

func (r *mockResolver) ResolveNow(o resolver.ResolveNowOptions) {
  addrs := make([]resolver.Address, 0)
  for i := 0; i < 3; i++ {
    addrs = append(addrs, resolver.Address{
      ServerName: "resolver.mock.grpc.io" + strconv.FormatInt(int64(i), 10),
      Addr:       "127.0.0.1:908" + strconv.FormatInt(int64(i), 10)})
  }
  fmt.Println("addrs", addrs)
  //2023/02/05 16:55:53 could not greet rpc error: code = Unavailable desc = last connection error: connection error: desc = "transport: Error while dialing dial tcp: lookup tcp///default/resolver.mock.grpc.io: nodename nor servname provided, or not known"
  r.start()
}
func (*mockResolver) Close() {}

func init() {
  resolver.Register(&mockResolverBuilder{})
  fmt.Println(resolver.Get(MockResolverScheme))
}
代码语言:javascript
复制
package mybalancer

import (
  "learn/grpc/picker"

  "google.golang.org/grpc/balancer"
  "google.golang.org/grpc/balancer/base"
)

const Name = "random"

func init() {
  balancer.Register(newBuilder())
}

func newBuilder() balancer.Builder {
  return base.NewBalancerBuilder(Name, picker.NewRandomPickerBuilder(), base.Config{HealthCheck: true})
}
代码语言:javascript
复制
package picker

import (
  "fmt"
  "math/rand"
  "sync"
  "time"

  "google.golang.org/grpc/balancer"
  "google.golang.org/grpc/balancer/base"
)

func NewRandomPickerBuilder() *randomPickerBuilder {
  return &randomPickerBuilder{}
}

type randomPickerBuilder struct {
}

type Conn struct {
  SubConn     balancer.SubConn
  SubConnInfo base.SubConnInfo
}

func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
  if len(info.ReadySCs) == 0 {
    return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  }
  readyScs := make([]Conn, 0, len(info.ReadySCs))
  for sc, info := range info.ReadySCs {
    readyScs = append(readyScs, Conn{
      SubConn:     sc,
      SubConnInfo: info,
    })
  }
  return &randomPicker{
    subConns: readyScs,
    r:        rand.New(rand.NewSource(time.Now().UnixNano())),
  }
}

type randomPicker struct {
  subConns []Conn
  mu       sync.Mutex
  r        *rand.Rand
}

func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
  next := r.r.Int() % len(r.subConns)
  sc := r.subConns[next]
  fmt.Printf("picked: %+v\n", sc.SubConnInfo.Address.Addr)
  return balancer.PickResult{
    SubConn: sc.SubConn,
  }, nil
}

这里实现的比较简单,比如我们可以通过target.Endpoint 来实现不同的endpoint获取策略。然后我们启动服务

代码语言:javascript
复制
package hello

import (
  context "context"

  codes "google.golang.org/grpc/codes"
  status "google.golang.org/grpc/status"
)

type ImplementedGreeterServer struct {
  Port string
}

func (i *ImplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) {
  return nil, status.Errorf(codes.Unimplemented, "method SayHello  implemented"+i.Port)
}
func (i *ImplementedGreeterServer) SayHello1(context.Context, *HelloRequest) (*HelloReply, error) {
  return nil, status.Errorf(codes.Unimplemented, "method SayHello1  implemented"+i.Port)
}

func (i *ImplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}
代码语言:javascript
复制
package main

import (
  helloworld "learn/grpc/conn/hello"
  "log"
  "net"
  "strconv"

  "google.golang.org/grpc"
)

func main() {
  for i := 0; i < 3; i++ {
    go func(i int) {
      addr := ":908" + strconv.FormatInt(int64(i), 10)

      srv := grpc.NewServer()
      helloworld.RegisterGreeterServer(srv, &helloworld.ImplementedGreeterServer{
        Port: addr,
      })

      listener, err := net.Listen("tcp", addr)
      if err != nil {
        log.Fatalf("failed to listen: %v", err)
      }

      err = srv.Serve(listener)
      if err != nil {
        log.Fatalf("failed to serve: %v", err)
      }
    }(i)
  }
  select {}
}

然后通过客户端发起请求

代码语言:javascript
复制
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  helloworld "learn/grpc/conn/hello"
  "learn/grpc/mybalancer"
  "learn/grpc/myresolver"

  "google.golang.org/grpc"
)

func main() {
  conn, err := grpc.Dial(myresolver.MockResolverScheme+":///myMock", grpc.WithInsecure(), grpc.WithTimeout(10*time.Second), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name)), grpc.WithBlock()) // grpc.WithBalancerName(builder.Name) //, grpc.WithBlock(),grpc.WithResolvers(r),
  defer conn.Close()
  if err != nil {
    log.Fatalf("did not1 connect: %v", err)
  }
  client := helloworld.NewGreeterClient(conn)
  for i := 0; i < 10; i++ {
    resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    resp, err = client.SayHello1(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    if err != nil {
      log.Println("could not greet", err)
    }
  }
}

这里需要注意的是,我们在创建连接的是通过schema来选择我们自定义的mock resolver,指定balancer的时候,使用的是 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name))来指定的,对于picker来说,它是balancer里面直接引用的,所以不需要在连接的时候指定。

代码语言:javascript
复制
1 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9082
picked: 127.0.0.1:9081
1 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
picked: 127.0.0.1:9081
2 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9080
2 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
3 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9080
3 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
4 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9080
4 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9081
5 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9082
5 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9082
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9082
picked: 127.0.0.1:9080
6 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9080
6 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9081
7 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9080
7 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
8 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9081
8 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
picked: 127.0.0.1:9081
9 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9080
9 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
10 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9082
10 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9082
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9082

看下结果,我们的连接端口是随机的,符合我们的预期。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档