现在,我们继续看 setUpGroup 函数里面的逻辑,我们在使用 NewGroup 创建 Group 之后,开始使用 http 包对外开放 RestAPI 接口,具体代码如下:
http.HandleFunc("/config", func(rw http.ResponseWriter, r *http.Request) {
k := r.URL.Query().Get("key")
var dest []byte
fmt.Printf("look up for %s from groupcache\n", k)
if err := stringGroup.Get(context.Background(), k, groupcache.AllocatingByteSliceSink(&dest)); err != nil {
rw.WriteHeader(http.StatusNotFound)
rw.Write([]byte("this key doesn't exists"))
} else {
rw.Write([]byte(dest))
}
})
这段代码里面,首先从 url 里面获取 key 参数的值,然后调用 Group 对象的 Get 方法获取 key 对应的值,如果获取到了就将获取到的值设置到返回值里面也就是回显到浏览器,如果没有获取到,就返回 HTTP 错误码。
现在,我们重点看下 Get 函数的实现,具体代码如下:
func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
value, cacheHit := g.lookupCache(key)
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)
}
Get 函数的大体流程是,首先调用 sync.Once 实现单次调用集群中的所有节点的方法,并将 Get 接口的统计次数加一,接着,从缓存里查找数据,如果找到了,则加入到缓存,直接返回,否则,调用 load 方法,load 方法可以实现首先从本地加载数据,如果没有,则从其他的节点加载数据。
现在,我们首先,看下初始化集群的其他节点的函数 initPeers,其具体代码如下:
func (g *Group) initPeers() {
if g.peers == nil {
g.peers = getPeers(g.name)
}
}
我们看到,首先,判断 peers 是否为空,如果不为空,则使用 getPeers 函数初始化 peers,其中,peers 的类型为 PeerPicker,具体定义如下:
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
// PickPeer returns the peer that owns the specific key
// and true to indicate that a remote peer was nominated.
// It returns nil, false if the key owner is the current peer.
PickPeer(key string) (peer ProtoGetter, ok bool)
}
我们看到,也是一个接口,该接口的作用就是查找某个 key 所属的集群节点,我们继续看下 getPeers 函数的具体实现,具体代码如下:
func getPeers(groupName string) PeerPicker {
if portPicker == nil {
return NoPeers{}
}
pk := portPicker(groupName)
if pk == nil {
pk = NoPeers{}
}
return pk
}
我们看到,这里首先判断 portPicker 变量,如果为空,则直接返回 NoPeers,否则,调用 portPicker 获取 Peers,portPicker 具体代码如下:
var (
portPicker func(groupName string) PeerPicker
)
portPicker 函数的具体赋值是在 RegisterPeerPicker 函数里面,具体代码如下:
// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPicker(fn func() PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = func(_ string) PeerPicker { return fn() }
}
这个具体的进一步代码分析,我们后续再分析,我们再次回到 Get 函数,Get 函数每次被调用,都会做下统计,具体代码在 Get 函数里如下:
g.Stats.Gets.Add(1)
现在,我们看下 Stats 结构,具体结构定义如下:
// Stats are per-group statistics.
type Stats struct {
Gets AtomicInt // any Get request, including from peers
CacheHits AtomicInt // either cache was good
PeerLoads AtomicInt // either remote load or remote cache hit (not an error)
PeerErrors AtomicInt
Loads AtomicInt // (gets - cacheHits)
LoadsDeduped AtomicInt // after singleflight
LocalLoads AtomicInt // total good local loads
LocalLoadErrs AtomicInt // total bad local loads
ServerRequests AtomicInt // gets that came over the network from peers
}
我们看到,Stats 里面的每一个字段都是 AtomicInt 类型的,AtomicInt 类型的定义如下:
// An AtomicInt is an int64 to be accessed atomically.
type AtomicInt int64
// Add atomically adds n to i.
func (i *AtomicInt) Add(n int64) {
atomic.AddInt64((*int64)(i), n)
}
// Get atomically gets the value of i.
func (i *AtomicInt) Get() int64 {
return atomic.LoadInt64((*int64)(i))
}
func (i *AtomicInt) String() string {
return strconv.FormatInt(i.Get(), 10)
}
我们可以看出,其实 AtomicInt 就是一个 int64 类型,不过做了封装,提供了线程安全的 Add 和 Get 方法。也就是说,Stats 结构里面提供了 Gets、CacheHits 等等各种接口的统计。
我们继续看 Get 方法里面的从缓存查找的方法,具体代码如下:
value, cacheHit := g.lookupCache(key)
lookupCache 函数的定义如下:
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
if g.cacheBytes <= 0 {
return
}
value, ok = g.mainCache.get(key)
if ok {
return
}
value, ok = g.hotCache.get(key)
return
}
我们可以看到,该函数的实现就是首先从 mainCache 里面查找,如果找不到,再次从 hotCache 里面查找,最后返回,我们看下 mainCache 的结构定义,如下:
mainCache cache
mainCache 其实是一个 cache 类型,我们看下 cache 类型的定义,代码如下:
// cache is a wrapper around an *lru.Cache that adds synchronization,
// makes values always be ByteView, and counts the size of all keys and
// values.
type cache struct {
mu sync.RWMutex
nbytes int64 // of all keys and values
lru *lru.Cache
nhit, nget int64
nevict int64 // number of evictions
}
可以看到,其实 cache 类型就是对 lru.Cache 的封装,也可以说是线程安全的 Cache,并且提供了一些线程安全的 get 和 add 方法以及删除的方法,具体代码如下:
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
c.lru = &lru.Cache{
OnEvicted: func(key lru.Key, value interface{}) {
val := value.(ByteView)
c.nbytes -= int64(len(key.(string))) + int64(val.Len())
c.nevict++
},
}
}
c.lru.Add(key, value)
c.nbytes += int64(len(key)) + int64(value.Len())
}
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.nget++
if c.lru == nil {
return
}
vi, ok := c.lru.Get(key)
if !ok {
return
}
c.nhit++
return vi.(ByteView), true
}
func (c *cache) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru != nil {
c.lru.RemoveOldest()
}
}
我们继续往下看 Get 函数,如果在缓存找到了数据,那么就直接返回,具体代码如下:
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
如果缓存命中了,首先将缓存命中的次数加一,并且将获取到的值 value 设置到 dest 中,如果缓存没有命中,那么执行下面的逻辑:
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)