现在,我们继续回到 Do 方法,这里继续从缓存拿一次,具体代码如下:
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
如果拿到了,则直接返回,否则,根据当前请求的 key,获取其对应所在的节点 peer,具体代码如下:
if peer, ok := g.peers.PickPeer(key); ok {
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
如果获取到了对应的节点,则调用 getFromPeer 函数直接从该节点获取键对应的值,否则,则调用 getLocally 从本地获取。接着,我们看 getFromPeer 函数实现,具体代码如下:
func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache)
}
return value, nil
}
我们看到,其实该函数就是调用 peer 的 Get 方法,这里的 peer 就是 PickPeer 函数返回的节点,其实就是 httpGetter,因此,这里的 Get 其实是 httpGetter 的 Get 方法,具体代码如下:
func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return err
}
req = req.WithContext(ctx)
tr := http.DefaultTransport
if h.transport != nil {
tr = h.transport(ctx)
}
res, err := tr.RoundTrip(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("server returned: %v", res.Status)
}
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
defer bufferPool.Put(b)
_, err = io.Copy(b, res.Body)
if err != nil {
return fmt.Errorf("reading response body: %v", err)
}
err = proto.Unmarshal(b.Bytes(), out)
if err != nil {
return fmt.Errorf("decoding response body: %v", err)
}
return nil
}
这里的代码其实就是发送 HTTP 请求,去对端节点获取数据。现在,我们再次回到 load 方法,如果还是没有获取到数据,那么就调用 getLocally 方法,具体代码如下:
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
getLocally 代码的具体实现如下:
func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
return dest.view()
}
我们看到,其实就是调用 getter 的 Get 方法。