关于 viper,无须多言,它是 Golang 社区里最流行的配置文件工具,除了常见功能之外,它还支持很多高级功能,比如可以加载远程配置,正好我最近在研究 etcd,于是我打算把二者结合起来,没想到就此开启了填坑之旅。
按照文档上的介绍,只需启动一个 goroutine 执行 WatchRemoteConfig 即可:
Watching Changes in etcd
可惜当我套用如上代码时,却发现在我的环境里根本没法用。究其原因,是因为 viper 依赖 crypt,而 crypt 截至目前还不支持新版 etcd 的 api。
viper 使用 crypt 做什么呢?在文档中可以找到如下描述:
Viper uses crypt to retrieve configuration from the K/V store, which means that you can store your configuration values encrypted and have them automatically decrypted if you have the correct gpg keyring. Encryption is optional.
也就是说,KV 中可以存储加密的数据,viper 在获取的时候通过 crypt 自动解密。虽然此功能的出发点很好,但是在绝大多数场景下,etcd 中的数据通过 acl + tls 来保护就足够了,并不需要存储加密数据。此外,别的不说,如果真的都存储加密数据,那么至少我们想通过 etcdkeeper 之类的工具修改 etcd 数据就不容易了。
于是我琢磨着有没有变通的方法,当我在查阅原始代码的时候,发现一个奇怪的问题:
remote
如上所示:Get 和 Watch 两个操作几乎一摸一样,内部都是调用后端的 Get 方法来获取数据。此时回头看看开头关于 WatchRemoteConfig 的例子的话,你会发现虽然语意上是想通过 Watch 监控,但是后端实际上执行的却是 Get,所谓的 Watch 监控其实是通过 Get 轮询实现的!官方代码为什么要这么干呢?当我 blame 后终于发现了原因:
原本 Get 和 Watch 两个操作是各自独立的,当在 for 循环里调用 WatchRemoteConfig 的时候,实际上就相当于在 for 循环里调用后端 Watch,偏偏 crypt 的 Watch 实现在每次调用的时候都会生成一个新的 goroutine,并且无法退出,于是乎 goroutine 的数量就失控了,因为问题出在 crypt 身上,viper 无法根治,为了掩盖此问题,不得不把 Watch 改成了 Get,并引入一个 WatchRemoteConfigOnChannel 方法来实现更完善的监控。
了解了前因后果之后,我决定跳过 crypt,自己实现加载远程配置的功能,其实就是实现 viper 中的 remoteConfigFactory 接口:
type remoteConfigFactory interface {
Get(rp RemoteProvider) (io.Reader, error)
Watch(rp RemoteProvider) (io.Reader, error)
WatchChannel(rp RemoteProvider) (<-chan *RemoteResponse, chan bool)
}
代码如下所示,参考了原始 remote.go 的实现方式:
package remote
import (
"bytes"
"context"
"io"
"time"
"github.com/spf13/viper"
"go.etcd.io/etcd/clientv3"
)
type Config struct {
viper.RemoteProvider
Username string
Password string
}
func (c *Config) Get(rp viper.RemoteProvider) (io.Reader, error) {
c.RemoteProvider = rp
return c.get()
}
func (c *Config) Watch(rp viper.RemoteProvider) (io.Reader, error) {
c.RemoteProvider = rp
return c.get()
}
func (c *Config) WatchChannel(rp viper.RemoteProvider) (<-chan *viper.RemoteResponse, chan bool) {
c.RemoteProvider = rp
rr := make(chan *viper.RemoteResponse)
stop := make(chan bool)
go func() {
for {
client, err := c.newClient()
if err != nil {
time.Sleep(time.Duration(time.Second))
continue
}
defer client.Close()
ch := client.Watch(context.Background(), c.RemoteProvider.Path())
select {
case <-stop:
return
case res := <-ch:
for _, event := range res.Events {
rr <- &viper.RemoteResponse{
Value: event.Kv.Value,
}
}
}
}
}()
return rr, stop
}
func (c *Config) newClient() (*clientv3.Client, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{c.Endpoint()},
Username: c.Username,
Password: c.Password,
})
if err != nil {
return nil, err
}
return client, nil
}
func (c *Config) get() (io.Reader, error) {
client, err := c.newClient()
if err != nil {
return nil, err
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := client.Get(ctx, c.Path())
cancel()
if err != nil {
return nil, err
}
return bytes.NewReader(resp.Kvs[0].Value), nil
}
实际调用代码和以前类似,只需额外触发一下 WatchRemoteConfigOnChannel 即可:
func main() {
endpoint := "http://127.0.0.1:2379"
path := "/config/test"
viper.RemoteConfig = &remote.Config{}
v := viper.New()
v.AddRemoteProvider("etcd", endpoint, path)
v.SetConfigType("toml")
v.ReadRemoteConfig()
v.WatchRemoteConfigOnChannel()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, v.GetString("service.password"))
})
http.ListenAndServe(":8080", nil)
}
最后再次强烈推荐通过 etcdkeeper 作为 etcd 的 web 前端工具,真好看真方便:
etcdkeeper
一切就绪后,你可以试着修改 etcd 里的数据,甚至重启 etcd 服务后再修改 etcd 里的数据,你会惊喜的发现应用代码无需轮询就能实时感知到数据变化,完美!