前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go语言入门学习之Groupcache源码分析

Go语言入门学习之Groupcache源码分析

原创
作者头像
nju万磁王
修改2021-01-20 10:39:51
1.1K0
修改2021-01-20 10:39:51
举报
文章被收录于专栏:golang学习

源码框架

groupcache是memcached的作者作者Brad Fitzpatrick写的GO的版本,现用于dl.google.com,主要用于静态文件资源服务,是一款轻量级开源项目,容易理解,是作为go语言入手学习的不错选择。本文主要针对groupcache的架构和功能进行了源码介绍。

图1为整个代码框架图

企业微信截图_16111103202694.png
企业微信截图_16111103202694.png

整个代码一般的使用流程是:

  1. 首先用户发起请求到httppool的监听节点,httppool设置peers,并且自身相当于一个peer服务器节点,实现了peers.go的 接口(其中peer代表服务器节点)
  2. httppool节点收到请求后分析字段,根据groupname调用相应的group(group为查询数据的主要结构体)
  3. group查询本地的mainCache和hotCache,找到数据返回给客户,否则执行步骤4
  4. 查询当前节点是否为当前httppool代表的peer节点,如果是执行步骤5,否则执行步骤6
  5. 从本地数据集获取数据,并将数据存放在mainCache缓存里,执行步骤7
  6. 从对应的节点获取数据,并有1/10的概率缓存到当前hotCache里
  7. 将数据返还给客户

各部分源码分析

consistenthash.go

groupcache使用一致性哈希的方法来根据key的值分配对应的peer存储。使用一致性哈希是因为要解决普通哈希分布式系统的容错性和扩展性问题,其中容错性指的是当系统中某一个或某几个服务器变的不可用的时候,整个系统是否能正确的高效运行,扩展性值得是新增加服务器的时候,整个系统是否可以高效运行。关于一致性哈希的理论说明,请参考文章http://blog.codinglabs.org/articles/consistent-hashing.html

代码语言:txt
复制
type Hash func(data []byte) uint32

type Map struct {
	hash     Hash
	replicas int //每个服务器对应的虚拟节点的个数,默认为50个
	keys     []int // Sorted
	hashMap  map[int]string //虚拟节点到实际服务器的映射
}

//创建Map对象
func New(replicas int, fn Hash) *Map {
	m := &Map{
		replicas: replicas,
		hash:     fn,
		hashMap:  make(map[int]string),
	}
	if m.hash == nil {
		m.hash = crc32.ChecksumIEEE
	}
	return m
}

// Returns true if there are no items available.检测当前Map是否为空
func (m *Map) IsEmpty() bool {
	return len(m.keys) == 0
}

// Adds some keys to the hash.添加虚拟节点,
func (m *Map) Add(keys ...string) {
	for _, key := range keys {
		for i := 0; i < m.replicas; i++ {
			hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
			m.keys = append(m.keys, hash)
			m.hashMap[hash] = key
		}
	}
	sort.Ints(m.keys)
}

// Gets the closest item in the hash to the provided key.根据key值获取hash值,获取最靠近hash值的服务器节点,此节点必须大于等于hash
func (m *Map) Get(key string) string {
	if m.IsEmpty() {
		return ""
	}

	hash := int(m.hash([]byte(key)))

	// Binary search for appropriate replica.
	idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })

	// Means we have cycled back to the first replica.
	if idx == len(m.keys) {
		idx = 0
	}

	return m.hashMap[m.keys[idx]]
}

lru.go

groupcache使用lru(最近最少使用)算法,来更新缓存,缓存的实际存储方式是存储在cache mapinterface{}list.Element,实现底层Lru算法的是在ll list.List链表里,将最新用到的热门数据放到链表的头,当缓存数据超过最大数量的时候根据list去除尾部最不常用的数据。 但是此算法有缺陷,就是当数据量特别多的时候,所有数据都被访问的时候,list节点不断更新,此时lru的效果就会失效,难以找出真的频繁使用的数据,记得mysql的缓存里lru算法利用将lru链表分为新旧两段来针对这一缺陷进行了改造。

代码语言:txt
复制
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
	// MaxEntries is the maximum number of cache entries before
	// an item is evicted. Zero means no limit.
	MaxEntries int

	// OnEvicted optionally specificies a callback function to be
	// executed when an entry is purged from the cache.
	OnEvicted func(key Key, value interface{})

	ll    *list.List //存储内容的链表
	cache map[interface{}]*list.Element //利用map快速查找内容
}


// RemoveOldest removes the oldest item from the cache.//根据list尾部的数据,去除map和list中的数据
func (c *Cache) RemoveOldest() {
	if c.cache == nil {
		return
	}
	ele := c.ll.Back()
	if ele != nil {
		c.removeElement(ele)
	}
}

//根据list.Element去除map和list中的数据
func (c *Cache) removeElement(e *list.Element) {
	c.ll.Remove(e)
	kv := e.Value.(*entry)
	delete(c.cache, kv.key)
	if c.OnEvicted != nil {
		c.OnEvicted(kv.key, kv.value)
	}
}

singleflight.go

此文件的功能主要是处理多个并发请求同时请求相同的Key的时候,保证函数只被调用一次,避免了资源的浪费。

代码语言:txt
复制
// call is an in-flight or completed Do call
type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}

// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
//此函数实现了groupcache.go中的flightGroup接口,功能为保证多个并发查询相同值的时候,只需要其中一个请求访问实际数据,然后其他多个请求一块共享结果
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()//如果找到了则等待
		return c.val, c.err
	}
	c := new(call)//如果没找到,则新创建一个call
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	c.val, c.err = fn()//获取实际的值
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)//当并发请求完数据后,会存储到对应的缓存里,所以此Map里没必要存着此值占用空间
	g.mu.Unlock()

	return c.val, c.err
}

代码中的关键是使用waitgroup将并发写合并成一个写。

  1. 首先上锁,查询map中的key对应的call,没找到new一个,并且waitgroup添加1,找到了则表明已经有一个请求正在查询数据了,所以此时当前请求waitgroup进行等待
  2. 接着正在查询数据的请求调用回调函数获取数据后进行写合并操作,然后调用wg.done表示操作完成,并将map中对应的key删除掉
  3. 最后那些waitgroup等待的请求在wg.done后获取数据并返回

peers.go

这个文件主要定义了一些peer相关的接口。

ProtoGetter接口的作用是实现节点的接口,通过这个点,可以获取对应in的out值

PeerPicker接口的作用是通过key值获取对应的服务器节点,其中HTTPPool实现了这个接口

代码语言:txt
复制
// ProtoGetter is the interface that must be implemented by a peer.//向服务器节点peer查询数据的接口
type ProtoGetter interface {
	Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error
}

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
//为通过key值获取节点的接口
type PeerPicker interface {
	// PickPeer returns the peer that owns the specific key
	// and true to indicate that a remote peer was nominated.
	// It returns nil, false if the key owner is the current peer.
	PickPeer(key string) (peer ProtoGetter, ok bool)
}

http.go

HTTPPool是实现了PeerPicker接口的结构体,而且其结构体内部元素httpGetter实现了ProtoGetter接口,所以此结构体为整个源码的关键部分。

代码语言:txt
复制
// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
	// Context optionally specifies a context for the server to use when it
	// receives a request.
	// If nil, the server uses a nil Context.
	Context func(*http.Request) Context

	// Transport optionally specifies an http.RoundTripper for the client
	// to use when it makes a request.
	// If nil, the client uses http.DefaultTransport.
	Transport func(Context) http.RoundTripper

	// this peer's base URL, e.g. "https://example.net:8000"
	self string //当前节点的base url

	// opts specifies the options.
	opts HTTPPoolOptions

	mu          sync.Mutex // guards peers and httpGetters
	peers       *consistenthash.Map //分布式peers,一致性哈希对象,通过此获取key值对应的节点的base url
	httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"httpGetter为实现peers.go中ProtoGetter接口的对象,此map的key为节点base url
}

// HTTPPoolOptions are the configurations of a HTTPPool.
type HTTPPoolOptions struct {
	// BasePath specifies the HTTP path that will serve groupcache requests.
	// If blank, it defaults to "/_groupcache/".
	BasePath string //url的基本path

	// Replicas specifies the number of key replicas on the consistent hash.
	// If blank, it defaults to 50.
	Replicas int//一致性哈希的虚拟节点的个数

	// HashFn specifies the hash function of the consistent hash.
	// If blank, it defaults to crc32.ChecksumIEEE.
	HashFn consistenthash.Hash //哈希方法
}

先看HTTPPool的创造方式:其中NewHTTPPool为创建HTTPPool总方法,主要分为三个步骤

  1. 创建HTTPPool结构体
  2. 注册此HTTPPool为PeerPicker对象,这样在初始化groupcache.go中的Group结构体的PeerPicker对象的时候可以直接查找到此HTTPPool对象并赋值给它
  3. 使用http.handle注册路由
代码语言:txt
复制
func NewHTTPPool(self string) *HTTPPool {
	p := NewHTTPPoolOpts(self, nil)
	http.Handle(p.opts.BasePath, p)
	return p
}

var httpPoolMade bool

// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
	if httpPoolMade {
		panic("groupcache: NewHTTPPool must be called only once")
	}
	httpPoolMade = true

	p := &HTTPPool{
		self:        self,
		httpGetters: make(map[string]*httpGetter),
	}
	if o != nil {
		p.opts = *o
	}
	if p.opts.BasePath == "" {
		p.opts.BasePath = defaultBasePath//默认url路径
	}
	if p.opts.Replicas == 0 {
		p.opts.Replicas = defaultReplicas//默认虚拟节点个数
	}
	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)

	RegisterPeerPicker(func() PeerPicker { return p })
	return p
}
//注册HTTPPool为当前服务器的PeerPicker
func RegisterPeerPicker(fn func() PeerPicker) {
	if portPicker != nil {
		panic("RegisterPeerPicker called more than once")
	}
	portPicker = func(_ string) PeerPicker { return fn() }
}

接下来看httpGetter的相关操作,这个接口的主要目的是向指定的服务器节点获取数据

代码语言:txt
复制
//httpGetter实现了peers.go的ProtoGetter接口,这个接口实际上是向服务器节点查询数据的接口
type httpGetter struct {
	transport func(Context) http.RoundTripper
	baseURL   string //对应节点的base url
}

var bufferPool = sync.Pool{
	New: func() interface{} { return new(bytes.Buffer) },
}

//Get函数可以通过in里面的key值来将获取的value值存放在Out里面,利用url访问对应的服务器节点获取数据
func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error{
    u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(in.GetGroup()),
		url.QueryEscape(in.GetKey()),
	)//获取远程服务器节点的url
	req, err := http.NewRequest("GET", u, nil)//获取对应的数据
	...
}

//设置各个节点到一致性哈希中,并且根据节点设置对应的httpGetter
func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
	}
}

//根据key值获取对应的httpGetter
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()
	if p.peers.IsEmpty() {
		return nil, false
	}
	if peer := p.peers.Get(key); peer != p.self {
		return p.httpGetters[peer], true
	}
	return nil, false
}

最后我们来看看HTTPPool的路由查询函数,这个函数利用请求url里面的groupname和key参数进行数据值的获取,其中通过groupname可以获取到对应的Group对象,通过Group对象的Get函数针对key值查询对应的数据值。

代码语言:txt
复制
//为当前服务节点的访问路由函数
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	// Parse request.
	if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
		panic("HTTPPool serving unexpected path: " + r.URL.Path)
	}
	parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
	if len(parts) != 2 {
		http.Error(w, "bad request", http.StatusBadRequest)
		return
	}
	groupName := parts[0]
	key := parts[1]

	// Fetch the value for this group/key.
	group := GetGroup(groupName)//利用groupname获取对应的Group对象
	if group == nil {
		http.Error(w, "no such group: "+groupName, http.StatusNotFound)
		return
	}
	var ctx Context
	if p.Context != nil {
		ctx = p.Context(r)
	}

	group.Stats.ServerRequests.Add(1)
	var value []byte
	err := group.Get(ctx, key, AllocatingByteSliceSink(&value))//此为函数的重点部分,group对象的Get函数为根据key值获取数据值的总函数,在groupCache.go文档中来讲。
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Write the value to the response body as a proto message.
	body, err := proto.Marshal(&pb.GetResponse{Value: value})
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Header().Set("Content-Type", "application/x-protobuf")
	w.Write(body)
}

groupcache.go

groupcache.go为将以上各个部分功能综合起来的一个文件,包含了一个值获取的总流程。

代码语言:txt
复制
type Group struct {
	name       string //组的名字
	getter     Getter //为使用用户需要实现的部分,此Getter接口主要是用户指定获取原始数据的接口
	peersOnce  sync.Once
	peers      PeerPicker //为之前http.go中NewHTTPPoolOpts注册的HTTPPool对象
	cacheBytes int64 // limit for sum of mainCache and hotCache size
	mainCache cache //主存,为本地服务器节点放置的缓存key-value
	hotCache cache //热存储,设置这个的目的是,因为根据一致性哈希,每个key/value存储在不同的服务器节点的mainCache里,但是如果多次经过此服务器节点二次访问key对应的实际远程服务器节点获取值的时候会造成耗费增大,所以有1/10的概率会将此key-val存储在当前hotCache来加速查询
	loadGroup flightGroup //当多个请求并发访问同一个key对应的val值,并且mainCache和hotCache里都查不到key对应的val时候,LoadGroup实现了将多次并发请求原始数据融合为一次请求,减少原始数据服务器压力
	_ int32 // force Stats to be 8-byte aligned on 32-bit platforms
	// Stats are statistics on the group.
	Stats Stats //服务器节点访问状态
}

其中Group对象的核心是Get方法,通过此方法可以一窥总的根据Key获取value的全貌。

  1. 查询本地mainCache和hotCache,获取对应key的数据值,查询到则直接返回
  2. 根据key值查询服务器节点,如果此服务器节点是远程服务器节点,通过服务器节点获取值
  3. 如果是当前group服务器节点,则通过用户定义的Getter获取原始数据
代码语言:txt
复制
//为获取数据值的总流程
func (g *Group) Get(ctx Context, key string, dest Sink) error {
	g.peersOnce.Do(g.initPeers)//初始化peers,为之前注册的HTTPPool对象
	g.Stats.Gets.Add(1)
	if dest == nil {
		return errors.New("groupcache: nil dest Sink")
	}
	value, cacheHit := g.lookupCache(key)//这个是查询本地的mainCache和hotCache,如果有Key对应的val,则返回,没有的话则程序继续向下走

	if cacheHit {
		g.Stats.CacheHits.Add(1)
		return setSinkView(dest, value)
	}
	destPopulated := false
	//load函数主要是分为两个步骤,一是判断当前key是都对应group本身的这个服务器节点,如果是,则直接从用户定义的Getter接口获取数据并存储到当前组mainCache里,如果不是,则从对应的服务器节点获取数据,并有1/10的概率存储在当前组的hotCache里。
	value, destPopulated, err := g.load(ctx, key, dest)
	if err != nil {
		return err
	}
	if destPopulated {
		return nil
	}
	return setSinkView(dest, value)
}

//在缓存查询数据
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
	if g.cacheBytes <= 0 {
		return
	}
	value, ok = g.mainCache.get(key)
	if ok {
		return
	}
	value, ok = g.hotCache.get(key)
	return
}

func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
	g.Stats.Loads.Add(1)
	//Do函数实现了并发合并操作
	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
		if value, cacheHit := g.lookupCache(key); cacheHit {
			g.Stats.CacheHits.Add(1)
			return value, nil
		}
		g.Stats.LoadsDeduped.Add(1)
		var value ByteView
		var err error
		//获取对应的服务器节点,如果节点为远程服务器节点,则通过getFromPeer访问远程服务器节点获取数据
		if peer, ok := g.peers.PickPeer(key); ok {
			value, err = g.getFromPeer(ctx, peer, key)
			if err == nil {
				g.Stats.PeerLoads.Add(1)
				return value, nil
			}
			g.Stats.PeerErrors.Add(1)
		}
		//获取的服务器节点问本地服务器节点,所以通过getLocally函数从用户定义的Getter接口获取原始数据
		value, err = g.getLocally(ctx, key, dest)
		if err != nil {
			g.Stats.LocalLoadErrs.Add(1)
			return nil, err
		}
		g.Stats.LocalLoads.Add(1)
		destPopulated = true // only one caller of load gets this return value
		//填充mainCache缓存
		g.populateCache(key, value, &g.mainCache)
		return value, nil
	})
	if err == nil {
		value = viewi.(ByteView)
	}
	return
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码框架
  • 各部分源码分析
    • consistenthash.go
      • lru.go
        • singleflight.go
          • peers.go
            • http.go
              • groupcache.go
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档