Cache2go源码分析

cachetable.go

所有有关 CacheTable 的操作都存放在 cachetable.go 源文件里面,我们看到,在程序的一开始首先定义了 CacheTable 的结构,具体代码如下:

// CacheTable is a table within the cache type CacheTable struct { sync.RWMutex // The table's name. name string // All cached items. items map[interface{}]*CacheItem // Timer responsible for triggering cleanup. cleanupTimer *time.Timer // Current timer duration. cleanupInterval time.Duration // The logger used for this table. logger *log.Logger // Callback method triggered when trying to load a non-existing key. loadData func(key interface{}, args ...interface{}) *CacheItem // Callback method triggered when adding a new item to the cache. addedItem func(item *CacheItem) // Callback method triggered before deleting an item from the cache. aboutToDeleteItem func(item *CacheItem) }

下面,首先,我们先看 Add 接口,具体代码如下:

// Add adds a key/value pair to the cache. // Parameter key is the item's cache-key. // Parameter lifeSpan determines after which time period without an access the item // will get removed from the cache. // Parameter data is the item's value. func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem { item := NewCacheItem(key, lifeSpan, data) // Add item to cache. table.Lock() table.addInternal(item) return item }

Add 接口用于向 CacheTable 添加一个被缓存的对象,返回一个 CacheItem 对象,在添加时,我们需要提供需要被添加的对象的键、生命时长(也就是不被访问,多久会被删除),以及具体添加的数据。

在添加之前,首先使用 CacheItem 对象里面的 NewCacheItem 函数创建一个 CacheItem 对象,接着,使用 CacheTable 对象的锁对整个 CacheTable 对象进行加锁,加锁后,调用 table 对象的 addInternal 函数进行添加,添加完毕后,返回创建的 CacheItem 对象。

注意,这里没有释放锁,而是在 addInternal 函数里面释放了锁,下面,我们看 addInternal 函数的具体实现:

func (table *CacheTable) addInternal(item *CacheItem) { // Careful: do not run this method unless the table-mutex is locked! // It will unlock it for the caller before running the callbacks and checks table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name) table.items[item.key] = item // Cache values so we don't keep blocking the mutex. expDur := table.cleanupInterval addedItem := table.addedItem table.Unlock() // Trigger callback after adding an item to cache. if addedItem != nil { addedItem(item) } // If we haven't set up any expiration check timer or found a more imminent item. if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) { table.expirationCheck() } }

addInternal 函数接受一个 CacheItem 参数,具体实现将 CacheItem 添加到 CacheTable 中,第一行代码将 item 对象添加到了 table 的 items 的 map 中。

接着,将定时器下次执行时间和添加的回调函数先使用临时变量缓存起来,并立即释放锁,这样就可以提高并发,接着,如果添加的回调函数不为空,则直接调用添加的回调。

最后的 if 判断中也触发了 expirationCheck,这里触发的条件是当前添加的 item 对象是有缓存时间的,并且定时器的过期时间为 0(说明当前 table 没有定时器在运行)或者其过期时间小于定时器的触发事件,调用 expirationCheck 方法,更新定时器触发时间。

如果新加入的数据的过期时间比下次要进行删除的时间要小,如果不进行更新,就好导致该数据项不能及时删除。因此,上面的 if 判断必须要加。

现在,我们再看下 expirationCheck 函数的实现,具体代码如下:

// Expiration check loop, triggered by a self-adjusting timer. func (table *CacheTable) expirationCheck() { table.Lock() if table.cleanupTimer != nil { table.cleanupTimer.Stop() } if table.cleanupInterval > 0 { table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name) } else { table.log("Expiration check installed for table", table.name) } // To be more accurate with timers, we would need to update 'now' on every // loop iteration. Not sure it's really efficient though. now := time.Now() smallestDuration := 0 * time.Second for key, item := range table.items { // Cache values so we don't keep blocking the mutex. item.RLock() lifeSpan := item.lifeSpan accessedOn := item.accessedOn item.RUnlock() if lifeSpan == 0 { continue } if now.Sub(accessedOn) >= lifeSpan { // Item has excessed its lifespan. table.deleteInternal(key) } else { // Find the item chronologically closest to its end-of-lifespan. if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration { smallestDuration = lifeSpan - now.Sub(accessedOn) } } } // Setup the interval for the next cleanup run. table.cleanupInterval = smallestDuration if smallestDuration > 0 { table.cleanupTimer = time.AfterFunc(smallestDuration, func() { go table.expirationCheck() }) } table.Unlock() }

首先,使用互斥锁将整个操作都锁住,接着,在所有的操作之前,判断当前定时器是否还在运行,如果还在运行,则先停止定时器,接着,默认设置一个最小的轮询时长为 0。

遍历 table 里面的每一个 item,这里,同样,首先加锁,先取出需要操作的对象,然后直接解锁,提高并发。接着,判断如果 lifeSpan 为 0,则直接继续,因为 lifeSpan 为 0,表明不需要被删除。

如果当前时间减去最后一次被访问时间大于了生命时长,只直接调用 deleteInternal 删除这个过期的数据。否则,就重新计算最小的下次遍历时间,然后再次递归开启定时器,重新开始下一次遍历。

Cache2go源码分析总结

这里需要我们注意的就是,我们在使用互斥锁或者读写锁时,能尽快解锁尽量尽快解锁,以提高程序的并发性,就像我们这里的程序,先将需要操作的数据,使用临时变量缓存起来,接着立刻解锁。