前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实战etcd的服务发现

实战etcd的服务发现

作者头像
LA0WAN9
发布2021-12-14 09:03:40
1.4K1
发布2021-12-14 09:03:40
举报
文章被收录于专栏:火丁笔记

在云原生的时代,服务发现已经是必不可少的功能,我借着最近迁移 gRPC 服务的机会尝试了一下如何用 etcd 实现服务发现,期间遇到诸多问题,本文逐一记之。

虽然 gRPC 并没有内置 etcd 的服务发现功能,但是它提供了相关接口让我们扩展:

代码语言:javascript
复制
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
	// ResolveNow will be called by gRPC to try to resolve the target name
	// again. It's just a hint, resolver can ignore this if it's not necessary.
	//
	// It could be called multiple times concurrently.
	ResolveNow(ResolveNowOptions)
	// Close closes the resolver.
	Close()
}

在实际动手之前,有必要了解一下「gRPC Name Resolution」,它定义了 gRPC 的 URI 格式,举个例子:「dns://1.1.1.1/huoding.com」,其中:

  • Scheme:dns
  • Authority:1.1.1.1
  • Endpoint:huoding.com

表示通过 dns 服务器 1.1.1.1 查询 huoding.com 有哪些节点。

既然我们要支持 etcd,那么我们首先要想好 etcd 对应的 URI 应该是什么样的,尤其是 Authority 填什么好呢?按上面例子的意思,填 etcd 服务器的地址似乎就可以,不过实际情况中,一般会有多台 etcd 服务器,还牵扯到用户名密码,虽然我们可以构造一个复杂的 DSN 字符串全写到 Authority 里,但是那样的话显得太臃肿了,还不如直接从配置文件里获取来的实在,所以建议 Authority 留空。假设我们要通过 etcd 查询一个名为 foo 的服务对应的节点,那么 URI 可以定义为:「etcd:///foo」。

了解了基础知识之后,在编码前让我们在头脑里过一遍 gRPC 的服务流程:

  • 服务端启动,在 etcd 里通过租约注册键为「/foo/<ip>:<port>」并且值为「<ip>:<port>」的数据,同时定期发送心跳包,一旦节点退出会注销相关数据。
  • 客户端启动,gRPC 从 etcd:///foo 解析出 Scheme、Authority、Endpoint,并根据 Scheme 找到对应 Builder,调用其 Build 方法,返回对应的 Resolver,在 etcd 中查询前缀是「/foo/」的数据,就是目前可用的节点。
  • 最后,负载均衡会挑选出一个节点来提供服务。

etcd

下面可以粘代码了,我主要是参考 gRPC 内置的 dns_resolver.go 来实现的。

先是 builder.go,实现了 Builder 接口:

代码语言:javascript
复制
package etcd

import (
	"fmt"

	"go.etcd.io/etcd/clientv3"
	"google.golang.org/grpc/resolver"
)

type Builder struct {
	Client *clientv3.Client
}

func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	prefix := fmt.Sprintf("/%s/", target.Endpoint)

	r := &Resolver{
		Client: b.Client,
		cc:     cc,
		prefix: prefix,
	}

	go r.watcher()
	r.ResolveNow(resolver.ResolveNowOptions{})
	return r, nil
}

func (b *Builder) Scheme() string {
	return "etcd"
}

再是 resolver.go,实现了 Resolver 接口:

代码语言:javascript
复制
package etcd

import (
	"context"
	"sync"

	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/mvcc/mvccpb"
	"google.golang.org/grpc/resolver"
)

type Resolver struct {
	sync.RWMutex
	Client    *clientv3.Client
	cc        resolver.ClientConn
	prefix    string
	addresses map[string]resolver.Address
}

func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
	// todo
}

func (r *Resolver) Close() {
	// todo
}

func (r *Resolver) watcher() {
	r.addresses = make(map[string]resolver.Address)

	response, err := r.Client.Get(context.Background(), r.prefix, clientv3.WithPrefix())

	if err == nil {
		for _, kv := range response.Kvs {
			r.setAddress(string(kv.Key), string(kv.Value))
		}

		r.cc.UpdateState(resolver.State{
			Addresses: r.getAddresses(),
		})
	}

	watch := r.Client.Watch(context.Background(), r.prefix, clientv3.WithPrefix())

	for response := range watch {
		for _, event := range response.Events {
			switch event.Type {
			case mvccpb.PUT:
				r.setAddress(string(event.Kv.Key), string(event.Kv.Value))
			case mvccpb.DELETE:
				r.delAddress(string(event.Kv.Key))
			}
		}

		r.cc.UpdateState(resolver.State{
			Addresses: r.getAddresses(),
		})
	}
}

func (r *Resolver) setAddress(key, address string) {
	r.Lock()
	defer r.Unlock()
	r.addresses[key] = resolver.Address{Addr: string(address)}
}

func (r *Resolver) delAddress(key string) {
	r.Lock()
	defer r.Unlock()
	delete(r.addresses, key)
}

func (r *Resolver) getAddresses() []resolver.Address {
	addresses := make([]resolver.Address, 0, len(r.addresses))

	for _, address := range r.addresses {
		addresses = append(addresses, address)
	}

	return addresses
}

接着是服务端代码:

代码语言:javascript
复制
func main() {
	host := viper.GetString("server.host")
	port := viper.GetString("server.port")
	listener, err := net.Listen("tcp", net.JoinHostPort(host, port))

	if err != nil {
		log.Fatalln(err)
	}

	server := grpc.NewServer()
	reflection.Register(server)
	pb.RegisterFooServer(server, &foo.Server{})
	close, err := register("foo", 10)

	if err != nil {
		log.Fatalln(err)
	}

	go func() {
		if err := server.Serve(listener); err != nil {
			log.Fatalln(err)
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	close()
}

func register(service string, ttl int64) (func(), error) {
	port := viper.GetString("server.port")
	client, err := etcdClient()

	if err != nil {
		return nil, err
	}

	ctx := context.Background()
	lease, err := client.Grant(ctx, ttl)

	if err != nil {
		return nil, err
	}

	ip, err := localIP()

	if err != nil {
		return nil, err
	}

	key := fmt.Sprintf("/%s/%s:%s", service, ip, port)
	value := fmt.Sprintf("%s:%s", ip, port)
	_, err = client.Put(ctx, key, value, clientv3.WithLease(lease.ID))

	if err != nil {
		return nil, err
	}

	keepAlive, err := client.KeepAlive(ctx, lease.ID)

	if err != nil {
		return nil, err
	}

	go func() {
		for {
			<-keepAlive
		}
	}()

	close := func() {
		_, _ = client.Revoke(ctx, lease.ID)
	}

	return close, nil
}

func localIP() (string, error) {
	addrs, err := net.InterfaceAddrs()

	if err != nil {
		return "", err
	}

	for _, addr := range addrs {
		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
			if ipnet.IP.To4() != nil {
				return ipnet.IP.String(), nil
			}
		}
	}

	return "", errors.New("unable to determine local ip")
}

最后是客户端代码:

代码语言:javascript
复制
func main() {
	client, err := etcdClient()

	if err != nil {
		log.Fatalln(err)
	}

	builder := &etcd.Builder{
		Client: client,
	}

	resolver.Register(builder)
	ctx := context.Background()
	target := "etcd:///foo"

	cc, err := grpc.DialContext(
		ctx,
		target,
		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
		grpc.WithInsecure(),
	)

	if err != nil {
		log.Fatalln(err)
	}

	defer cc.Close()

	// pb.NewFooClient(cc) ...
}

说明:在获取 ip 的时候,没有考虑内外网 ip 的情况,需要的可以参考相关资料

说明:etcd 3.5 以下版本 和 gRPC 的最新版本不兼容,需要在 go.mod 里指定版本:

代码语言:javascript
复制
replace (
	github.com/golang/protobuf => github.com/golang/protobuf v1.3.2
	go.etcd.io/etcd => go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
	google.golang.org/grpc => google.golang.org/grpc v1.26.0
)

结尾我要说的是 Go Kit 是个好东西啊, 内置支持各种服务发现,包括 etcd

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-11-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档