groupcache 是一个 kv 缓存库,并且致力于在某些场景下替代 MemCached。 其 github 官方地址如下:
https://github.com/golang/groupcache
在启动 groupcache 的时候需要监听两个端口,一个端口是用来外部访问 groupcache 的,另外一个端口是集群内部 peer 互相通信使用的。
这里,我们就使用 GOPATH 的方式来搭建运行环境,首先,从上面的 github 地址下载 groupcache 源码,并重命名为 groupcache,接着,我们新建一个 groupcache-case 工程,并在该工程下创建 base\src\github.com\golang 路径,将 groupcache 源码放在此目录,具体目录如下:
同时,我们配置 GOPATH 路径,具体配置如下:
现在,我们在 groupcache-case 工程根路径下新建一个 main.go 文件,输入以下代码:
package main
import (
"bufio"
"context"
"flag"
"fmt"
"github.com/golang/groupcache"
"io"
"net/http"
"os"
"strings"
)
const(
defaultHost = "127.0.0.1:9001"
groupAddr = ":8081"
)
func main(){
fmt.Println("嗨客网(www.haicoder.net)")
if len(os.Args) < 1{
fmt.Println("Args Err, Pls input host and port")
os.Exit(1)
}
flag.Parse()
//本机Peer地址
self := flag.String("self", defaultHost, "self node")
//cache集群节点
cluster := os.Args[1:]
//初始化本地groupcache, 并监听groupcache相应的端口
setUpGroup("haicoder_cache")
//本地peer
peers := groupcache.NewHTTPPool(addrsToUrl(*self)[0])
//设置集群信息 用以本机缓存没命中的时候,一致性哈希查找key的存储节点, 并通过http请求访问
peers.Set(addrsToUrl(cluster...)...)
selfPort := strings.Split(*self, ":")[1]
//监听本机集群内部通信的端口
http.ListenAndServe(":"+selfPort, peers)
}
//启动groupcache
func setUpGroup(name string){
//缓存池
stringGroup := groupcache.NewGroup(name, 1<<20, groupcache.GetterFunc(getterFunc))
http.HandleFunc("/config", func(rw http.ResponseWriter, r *http.Request) {
k := r.URL.Query().Get("key")
var dest []byte
fmt.Printf("look up for %s from groupcache\n", k)
if err := stringGroup.Get(context.Background(), k, groupcache.AllocatingByteSliceSink(&dest)); err != nil {
rw.WriteHeader(http.StatusNotFound)
rw.Write([]byte("this key doesn't exists"))
} else {
rw.Write([]byte(dest))
}
})
//能够直接访问cache的端口, 启动http服务
//http://ip:group_addr/config?key=xxx
go http.ListenAndServe(groupAddr, nil)
}
func getterFunc(ctx context.Context, key string, dest groupcache.Sink) (err error){
//当cache miss之后,用来执行的load data方法
fp, err := os.Open("groupcache.conf")
if err != nil {
fmt.Println("read groupcache.conf Err =", err)
return err
}
defer fp.Close()
fmt.Printf("look up for %s from config_file\n", key)
//按行读取配置文件
buf := bufio.NewReader(fp)
for {
line, err := buf.ReadString('\n')
if err != nil {
if err == io.EOF {
dest.SetBytes([]byte{})
return nil
} else {
return err
}
}
line = strings.TrimSpace(line)
parts := strings.Split(line, "=")
if len(parts) > 2 {
continue
} else if parts[0] == key {
dest.SetBytes([]byte(parts[1]))
return nil
} else {
continue
}
}
}
//将ip:port转换成url的格式
func addrsToUrl(node_list ...string) []string {
urls := make([]string, len(node_list))
for k, addr := range node_list {
urls[k] = "http://" + addr
}
return urls
}
现在,我们开始运行如上程序,我们使用如下命令开始运行该程序:
go run main.go 127.0.0.1:9001
在终端输入并运行,如下图所示:
现在,我们使用浏览器,访问,输入以下地址:
http://127.0.0.1:8081/config?key=key
此时,浏览器输出如下图所示:
并且,终端输出如下:
我们看到,此时提示我们没有这个 key,现在,我们在项目的根目录,新建一个 groupcache.conf 文件,并输入以下内容:
name=haicoder url=haicoder.net
注意,这里最后最好留一个空行,保存文件,我们再次重新运行程序,现在,我们再次在浏览器访问,输入以下地址:
http://127.0.0.1:8081/config?key=url
此时,浏览器输出如下:
我们看到,此时输出了我们文件里面的内容,同时,我们可以注意,第一次使用浏览器访问时,控制台输出如下:
即,首先,访问了缓存,但缓存没有找到该 key,记着,从文件里面加载,我们第二次访问时,此时,终端输出如下:
我们看到,这一次只是从缓存加载数据了,并没有再次从文件加载数据,即,我们的缓存生效了。
我们首先,在控制台,输入以下命令,启动第一个节点,具体命令如下:
go run main.go 127.0.0.1:9001 127.0.0.1:9002
运行后,控制台输出如下:
现在,我们修改 main.go 中的第 16 和 17 行代码,也就是修改默认的主机和端口,修改为如下:
const(
defaultHost = "127.0.0.1:9002"
groupAddr = ":8082"
)
现在,我们输入以下命令,运行第二个节点,具体命令如下:
go run main.go 127.0.0.1:9001 127.0.0.1:9002
现在,我们首先使用浏览器访问第二个节点,输入以下地址:
http://127.0.0.1:8082/config?key=url
此时浏览器输出如下:
现在,我们查看节点一的控制台输出,具体输出如下:
我们再看,节点二的控制台输出,输出如下:
因为访问的 8082 端口,会先去访问 8082 端口,然后查找 key 这个键所属的存储节点,发现存储节点并非本节点之后,通过 http 请求访问对应存储节点,获取数据,据此,我们可以总结一下 groupcache 总的流程:
输入 url 地址,访问 url 对应的存储节点 front server;
先查看 groupcache 中是否有该 key,有直接返回;若无,进入第三步数据 load;
load data 步骤,实现了前文提的缓存过滤机制;这里会重新查询 groupcache,防止重复 load;如果还没有,会先通过一致性哈希判断 key 所在的存储节点 cache peer;如果非本机,则进入第四步;否则,进入第五步;
调用 http 请求,访问对应的存储节点 cache peer,通过上文的集群内通信端口(9001/9002)访问,对应的 server 将依次执行 2,3,5 步骤,返回数据。
该 key 属于本机,则调用初始化 group 时定义的 GetterFunc 类型方法 load 数据,此处方法完全自定义,既可以访问配置文件,也可以访问 db。流程图如下: