Golang singleflight

groupcache singleflight

groupcache 中的 singleflight 用于控制多个相同的并发请求只查询一次,从而优化查询效率,具体的代码在 singleflight 文件夹下面的 singleflight.go 文件中,我们先写个代码,看具体如何使用,具体代码如下:

package main import ( "fmt" "github.com/golang/groupcache/singleflight" "time" ) func main(){ fmt.Println("嗨客网(www.haicoder.net)") singleDoGroup := singleflight.Group{} go func() { retVal, err := singleDoGroup.Do("haicoder", func() (data interface{}, e error) { time.Sleep(time.Second * 5) fmt.Println("Doing1...") return "HAICODER1", nil }) if err != nil{ fmt.Println("Do Err1, Err =", err) }else{ fmt.Println("RetVal1 =", retVal) } }() go func() { time.Sleep(time.Second) retVal, err := singleDoGroup.Do("haicoder", func() (data interface{}, e error) { fmt.Println("Doing2...") return "HAICODER1", nil }) if err != nil{ fmt.Println("Do Err2, Err =", err) }else{ fmt.Println("RetVal2 =", retVal) } }() time.Sleep(time.Second*10) }

执行完毕后,控制台输出如下:

03_groupcache singleflight.png

我们创建了两个 协程 用于同时模拟请求键为 haicoder 的数据,并且我们在第一个协程里面使用了 Sleep 模拟了耗时操作,在第二个协程开始的时候我们使用了 Sleep 等待了 1 秒钟,从而让第一个协程先执行。

在第二个协程里面我们同样模拟了请求键为 haicoder 的数据,从运行结果来看,第二个请求被第一个请求阻塞了,即,第二个请求虽然已经在执行了,但并没有出结果,一直在等待第一个请求的结束,因为第二个请求的键与第一个请求的键相同。

即,我们通过 singleflight 实现了合并多个相同的请求为一个请求。现在,我们修改你代码如下,请求不同的键:

package main import ( "fmt" "github.com/golang/groupcache/singleflight" "time" ) func main(){ fmt.Println("嗨客网(www.haicoder.net)") singleDoGroup := singleflight.Group{} go func() { retVal, err := singleDoGroup.Do("haicoder", func() (data interface{}, e error) { time.Sleep(time.Second * 5) fmt.Println("Doing1...") return "HAICODER1", nil }) if err != nil{ fmt.Println("Do Err1, Err =", err) }else{ fmt.Println("RetVal1 =", retVal) } }() go func() { time.Sleep(time.Second) retVal, err := singleDoGroup.Do("haicoder1", func() (data interface{}, e error) { fmt.Println("Doing2...") return "HAICODER2", nil }) if err != nil{ fmt.Println("Do Err2, Err =", err) }else{ fmt.Println("RetVal2 =", retVal) } }() time.Sleep(time.Second*10) }

执行完毕后,控制台输出如下:

04_groupcache singleflight.png

这次,我们可以看到,第二个请求并没有被阻塞,因为第二个请求的键与第一个不一样。

groupcache singleflight源码解析

我们查看 singleflight.go 文件,首先看到的是 call 结构体,具体代码如下:

// call is an in-flight or completed Do call type call struct { wg sync.WaitGroup val interface{} err error }

calll 的结构就是一个 WaitGroup 类型的属性一个 接口类型 的 val 和一个 err,这里的 val 和 err 正好是下面我们要使用的 Do 方法回调的返回值,因为在 Do 函数的实现中,需要将回调的具体值赋值给 call 对象的 val 和 err 属性。接着,我们来看 Group 结构,具体代码如下:

// Group represents a class of work and forms a namespace in which // units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized }

Group 结构的第一个 互斥锁 用于保护下面的 map 的并发安全,接着下面的 m 是一个 map 类型的结构,其中的键为 string 类型的,就是我们请求传递的键,最后我们来看 Do 函数的实现代码,具体代码如下:

// Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { //首先使用Group对象的mu实例进行加锁,保证并发安全 g.mu.Lock() //如果map为空,则使用make创建下 if g.m == nil { g.m = make(map[string]*call) } //判断当前请求的键是否已经存在map中了 if c, ok := g.m[key]; ok { //如果已经存在了,则先解锁 g.mu.Unlock() //使用WaitGroup对象的Wait方法,等待call对象的结束 c.wg.Wait() //结束后,返回call对象的val值 return c.val, c.err } //如果键不存在map中,则先创建一个call对象 c := new(call) //先使用WaitGroup对象Add c.wg.Add(1) //存放到map中 g.m[key] = c //解锁 g.mu.Unlock() //执行传入的函数,并将返回值返回给call对象 c.val, c.err = fn() //使用Done表明这个call对象执行完毕 c.wg.Done() //再次加锁 g.mu.Lock() //从map中删除这个键 delete(g.m, key) //解锁 g.mu.Unlock() return c.val, c.err }

Do 方法实现的原理就是,一个请求过来的时候,首先查看 Group 对象的 map 中是否存在相同的请求,如果有,则使用该请求的 WaitGroup 对象进行等待,如果没有,则在请求开始的时候,加入到 map 中,并使用 WaitGroup 对象的 Add 方法使相同的请求进行阻塞,接着,开始执行传入的回调函数,执行完毕后,赋值给 call 对象,并使用 WaitGroup 对象的 Done 方法,表明请求结束,此时,如果有相同的请求的 Wait 方法就会接触阻塞,最后,再从 map 中删除该请求即可。