123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 |
- package cache
- import (
- "sync"
- "sync/atomic"
- "time"
- )
- const (
- // Default maximum number of cache entries.
- maximumCapacity = 1 << 30
- // Buffer size of entry channels
- chanBufSize = 64
- // Maximum number of entries to be drained in a single clean up.
- drainMax = 16
- // Number of cache access operations that will trigger clean up.
- drainThreshold = 64
- )
- // currentTime is an alias for time.Now, used for testing.
- var currentTime = time.Now
- // localCache is an asynchronous LRU cache.
- type localCache struct {
- // internal data structure
- cache cache // Must be aligned on 32-bit
- // user configurations
- expireAfterAccess time.Duration
- expireAfterWrite time.Duration
- refreshAfterWrite time.Duration
- policyName string
- onInsertion Func
- onRemoval Func
- loader LoaderFunc
- exec Executor
- stats StatsCounter
- // cap is the cache capacity.
- cap int
- // accessQueue is the cache retention policy, which manages entries by access time.
- accessQueue policy
- // writeQueue is for managing entries by write time.
- // It is only fulfilled when expireAfterWrite or refreshAfterWrite is set.
- writeQueue policy
- // events is the cache event queue for processEntries
- events chan entryEvent
- // readCount is a counter of the number of reads since the last write.
- readCount int32
- // for closing routines created by this cache.
- closing int32
- closeWG sync.WaitGroup
- }
- // newLocalCache returns a default localCache.
- // init must be called before this cache can be used.
- func newLocalCache() *localCache {
- return &localCache{
- cap: maximumCapacity,
- cache: cache{},
- stats: &statsCounter{},
- }
- }
- // init initializes cache replacement policy after all user configuration properties are set.
- func (c *localCache) init() {
- c.accessQueue = newPolicy(c.policyName)
- c.accessQueue.init(&c.cache, c.cap)
- if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 {
- c.writeQueue = &recencyQueue{}
- } else {
- c.writeQueue = discardingQueue{}
- }
- c.writeQueue.init(&c.cache, c.cap)
- c.events = make(chan entryEvent, chanBufSize)
- c.closeWG.Add(1)
- go c.processEntries()
- }
- // Close implements io.Closer and always returns a nil error.
- // Caller would ensure the cache is not being used (reading and writing) before closing.
- func (c *localCache) Close() error {
- if atomic.CompareAndSwapInt32(&c.closing, 0, 1) {
- // Do not close events channel to avoid panic when cache is still being used.
- c.events <- entryEvent{nil, eventClose}
- // Wait for the goroutine to close this channel
- c.closeWG.Wait()
- }
- return nil
- }
- // GetIfPresent gets cached value from entries list and updates
- // last access time for the entry if it is found.
- func (c *localCache) GetIfPresent(k Key) (Value, bool) {
- en := c.cache.get(k, sum(k))
- if en == nil {
- c.stats.RecordMisses(1)
- return nil, false
- }
- now := currentTime()
- if c.isExpired(en, now) {
- c.stats.RecordMisses(1)
- c.sendEvent(eventDelete, en)
- return nil, false
- }
- c.stats.RecordHits(1)
- c.setEntryAccessTime(en, now)
- c.sendEvent(eventAccess, en)
- return en.getValue(), true
- }
- // Put adds new entry to entries list.
- func (c *localCache) Put(k Key, v Value) {
- h := sum(k)
- en := c.cache.get(k, h)
- now := currentTime()
- if en == nil {
- en = newEntry(k, v, h)
- c.setEntryWriteTime(en, now)
- c.setEntryAccessTime(en, now)
- // Add to the cache directly so the new value is available immediately.
- // However, only do this within the cache capacity (approximately).
- if c.cap == 0 || c.cache.len() < c.cap {
- cen := c.cache.getOrSet(en)
- if cen != nil {
- cen.setValue(v)
- en = cen
- }
- }
- } else {
- // Update value and send notice
- en.setValue(v)
- en.setWriteTime(now.UnixNano())
- }
- c.sendEvent(eventWrite, en)
- }
- // Invalidate removes the entry associated with key k.
- func (c *localCache) Invalidate(k Key) {
- en := c.cache.get(k, sum(k))
- if en != nil {
- en.setInvalidated(true)
- c.sendEvent(eventDelete, en)
- }
- }
- // InvalidateAll resets entries list.
- func (c *localCache) InvalidateAll() {
- c.cache.walk(func(en *entry) {
- en.setInvalidated(true)
- })
- c.sendEvent(eventDelete, nil)
- }
- // Get returns value associated with k or call underlying loader to retrieve value
- // if it is not in the cache. The returned value is only cached when loader returns
- // nil error.
- func (c *localCache) Get(k Key) (Value, error) {
- en := c.cache.get(k, sum(k))
- if en == nil {
- c.stats.RecordMisses(1)
- return c.load(k)
- }
- // Check if this entry needs to be refreshed
- now := currentTime()
- if c.isExpired(en, now) {
- c.stats.RecordMisses(1)
- if c.loader == nil {
- c.sendEvent(eventDelete, en)
- } else {
- // For loading cache, we do not delete entry but leave it to
- // the eviction policy, so users still can get the old value.
- c.setEntryAccessTime(en, now)
- c.refreshAsync(en)
- }
- } else {
- c.stats.RecordHits(1)
- c.setEntryAccessTime(en, now)
- c.sendEvent(eventAccess, en)
- }
- return en.getValue(), nil
- }
- // Refresh asynchronously reloads value for Key if it existed, otherwise
- // it will synchronously load and block until it value is loaded.
- func (c *localCache) Refresh(k Key) {
- if c.loader == nil {
- return
- }
- en := c.cache.get(k, sum(k))
- if en == nil {
- c.load(k)
- } else {
- c.refreshAsync(en)
- }
- }
- // Stats copies cache stats to t.
- func (c *localCache) Stats(t *Stats) {
- c.stats.Snapshot(t)
- }
- func (c *localCache) processEntries() {
- defer c.closeWG.Done()
- for e := range c.events {
- switch e.event {
- case eventWrite:
- c.write(e.entry)
- c.postWriteCleanup()
- case eventAccess:
- c.access(e.entry)
- c.postReadCleanup()
- case eventDelete:
- if e.entry == nil {
- c.removeAll()
- } else {
- c.remove(e.entry)
- }
- c.postReadCleanup()
- case eventClose:
- if c.exec != nil {
- // Stop all refresh tasks.
- c.exec.Close()
- }
- c.removeAll()
- return
- }
- }
- }
- // sendEvent sends event only when the cache is not closing/closed.
- func (c *localCache) sendEvent(typ event, en *entry) {
- if atomic.LoadInt32(&c.closing) == 0 {
- c.events <- entryEvent{en, typ}
- }
- }
- // This function must only be called from processEntries goroutine.
- func (c *localCache) write(en *entry) {
- ren := c.accessQueue.write(en)
- c.writeQueue.write(en)
- if c.onInsertion != nil {
- c.onInsertion(en.key, en.getValue())
- }
- if ren != nil {
- c.writeQueue.remove(ren)
- // An entry has been evicted
- c.stats.RecordEviction()
- if c.onRemoval != nil {
- c.onRemoval(ren.key, ren.getValue())
- }
- }
- }
- // removeAll remove all entries in the cache.
- // This function must only be called from processEntries goroutine.
- func (c *localCache) removeAll() {
- c.accessQueue.iterate(func(en *entry) bool {
- c.remove(en)
- return true
- })
- }
- // remove removes the given element from the cache and entries list.
- // It also calls onRemoval callback if it is set.
- func (c *localCache) remove(en *entry) {
- ren := c.accessQueue.remove(en)
- c.writeQueue.remove(en)
- if ren != nil && c.onRemoval != nil {
- c.onRemoval(ren.key, ren.getValue())
- }
- }
- // access moves the given element to the top of the entries list.
- // This function must only be called from processEntries goroutine.
- func (c *localCache) access(en *entry) {
- c.accessQueue.access(en)
- }
- // load uses current loader to synchronously retrieve value for k and adds new
- // entry to the cache only if loader returns a nil error.
- func (c *localCache) load(k Key) (Value, error) {
- if c.loader == nil {
- panic("cache loader function must be set")
- }
- // TODO: Poll the value instead when the entry is loading.
- start := currentTime()
- v, err := c.loader(k)
- now := currentTime()
- loadTime := now.Sub(start)
- if err != nil {
- c.stats.RecordLoadError(loadTime)
- return nil, err
- }
- c.stats.RecordLoadSuccess(loadTime)
- en := newEntry(k, v, sum(k))
- c.setEntryWriteTime(en, now)
- c.setEntryAccessTime(en, now)
- c.sendEvent(eventWrite, en)
- return v, nil
- }
- // refreshAsync reloads value in a go routine or using custom executor if defined.
- func (c *localCache) refreshAsync(en *entry) bool {
- if c.loader == nil {
- panic("cache loader function must be set")
- }
- if en.setLoading(true) {
- // Only do refresh if it isn't running.
- if c.exec == nil {
- go c.refresh(en)
- } else {
- c.exec.Execute(func() { c.refresh(en) })
- }
- return true
- }
- return false
- }
- // refresh reloads value for the given key. If loader returns an error,
- // that error will be omitted. Otherwise, the entry value will be updated.
- // This function would only be called by refreshAsync.
- func (c *localCache) refresh(en *entry) {
- defer en.setLoading(false)
- start := currentTime()
- v, err := c.loader(en.key)
- now := currentTime()
- loadTime := now.Sub(start)
- if err == nil {
- c.stats.RecordLoadSuccess(loadTime)
- en.setValue(v)
- en.setWriteTime(now.UnixNano())
- c.sendEvent(eventWrite, en)
- } else {
- // TODO: Log error
- c.stats.RecordLoadError(loadTime)
- }
- }
- // postReadCleanup is run after entry access/delete event.
- // This function must only be called from processEntries goroutine.
- func (c *localCache) postReadCleanup() {
- if atomic.AddInt32(&c.readCount, 1) > drainThreshold {
- atomic.StoreInt32(&c.readCount, 0)
- c.expireEntries()
- }
- }
- // postWriteCleanup is run after entry add event.
- // This function must only be called from processEntries goroutine.
- func (c *localCache) postWriteCleanup() {
- atomic.StoreInt32(&c.readCount, 0)
- c.expireEntries()
- }
- // expireEntries removes expired entries.
- func (c *localCache) expireEntries() {
- remain := drainMax
- now := currentTime()
- if c.expireAfterAccess > 0 {
- expiry := now.Add(-c.expireAfterAccess).UnixNano()
- c.accessQueue.iterate(func(en *entry) bool {
- if remain == 0 || en.getAccessTime() >= expiry {
- // Can stop as the entries are sorted by access time.
- // (the next entry is accessed more recently.)
- return false
- }
- // accessTime + expiry passed
- c.remove(en)
- c.stats.RecordEviction()
- remain--
- return remain > 0
- })
- }
- if remain > 0 && c.expireAfterWrite > 0 {
- expiry := now.Add(-c.expireAfterWrite).UnixNano()
- c.writeQueue.iterate(func(en *entry) bool {
- if remain == 0 || en.getWriteTime() >= expiry {
- return false
- }
- // writeTime + expiry passed
- c.remove(en)
- c.stats.RecordEviction()
- remain--
- return remain > 0
- })
- }
- if remain > 0 && c.loader != nil && c.refreshAfterWrite > 0 {
- expiry := now.Add(-c.refreshAfterWrite).UnixNano()
- c.writeQueue.iterate(func(en *entry) bool {
- if remain == 0 || en.getWriteTime() >= expiry {
- return false
- }
- // FIXME: This can cause deadlock if the custom executor runs refresh in current go routine.
- // The refresh function, when finish, will send to event channels.
- if c.refreshAsync(en) {
- // TODO: Maybe move this entry up?
- remain--
- }
- return remain > 0
- })
- }
- }
- func (c *localCache) isExpired(en *entry, now time.Time) bool {
- if en.getInvalidated() {
- return true
- }
- if c.expireAfterAccess > 0 && en.getAccessTime() < now.Add(-c.expireAfterAccess).UnixNano() {
- // accessTime + expiry passed
- return true
- }
- if c.expireAfterWrite > 0 && en.getWriteTime() < now.Add(-c.expireAfterWrite).UnixNano() {
- // writeTime + expiry passed
- return true
- }
- return false
- }
- func (c *localCache) needRefresh(en *entry, now time.Time) bool {
- if en.getLoading() {
- return false
- }
- if c.refreshAfterWrite > 0 {
- tm := en.getWriteTime()
- if tm > 0 && tm < now.Add(-c.refreshAfterWrite).UnixNano() {
- // writeTime + refresh passed
- return true
- }
- }
- return false
- }
- // setEntryAccessTime sets access time if needed.
- func (c *localCache) setEntryAccessTime(en *entry, now time.Time) {
- if c.expireAfterAccess > 0 {
- en.setAccessTime(now.UnixNano())
- }
- }
- // setEntryWriteTime sets write time if needed.
- func (c *localCache) setEntryWriteTime(en *entry, now time.Time) {
- if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 {
- en.setWriteTime(now.UnixNano())
- }
- }
- // New returns a local in-memory Cache.
- func New(options ...Option) Cache {
- c := newLocalCache()
- for _, opt := range options {
- opt(c)
- }
- c.init()
- return c
- }
- // NewLoadingCache returns a new LoadingCache with given loader function
- // and cache options.
- func NewLoadingCache(loader LoaderFunc, options ...Option) LoadingCache {
- c := newLocalCache()
- c.loader = loader
- for _, opt := range options {
- opt(c)
- }
- c.init()
- return c
- }
- // Option add options for default Cache.
- type Option func(c *localCache)
- // WithMaximumSize returns an Option which sets maximum size for the cache.
- // Any non-positive numbers is considered as unlimited.
- func WithMaximumSize(size int) Option {
- if size < 0 {
- size = 0
- }
- if size > maximumCapacity {
- size = maximumCapacity
- }
- return func(c *localCache) {
- c.cap = size
- }
- }
- // WithRemovalListener returns an Option to set cache to call onRemoval for each
- // entry evicted from the cache.
- func WithRemovalListener(onRemoval Func) Option {
- return func(c *localCache) {
- c.onRemoval = onRemoval
- }
- }
- // WithExpireAfterAccess returns an option to expire a cache entry after the
- // given duration without being accessed.
- func WithExpireAfterAccess(d time.Duration) Option {
- return func(c *localCache) {
- c.expireAfterAccess = d
- }
- }
- // WithExpireAfterWrite returns an option to expire a cache entry after the
- // given duration from creation.
- func WithExpireAfterWrite(d time.Duration) Option {
- return func(c *localCache) {
- c.expireAfterWrite = d
- }
- }
- // WithRefreshAfterWrite returns an option to refresh a cache entry after the
- // given duration. This option is only applicable for LoadingCache.
- func WithRefreshAfterWrite(d time.Duration) Option {
- return func(c *localCache) {
- c.refreshAfterWrite = d
- }
- }
- // WithStatsCounter returns an option which overrides default cache stats counter.
- func WithStatsCounter(st StatsCounter) Option {
- return func(c *localCache) {
- c.stats = st
- }
- }
- // WithPolicy returns an option which sets cache policy associated to the given name.
- // Supported policies are: lru, slru, tinylfu.
- func WithPolicy(name string) Option {
- return func(c *localCache) {
- c.policyName = name
- }
- }
- // WithExecutor returns an option which sets executor for cache loader.
- // By default, each asynchronous reload is run in a go routine.
- // This option is only applicable for LoadingCache.
- func WithExecutor(executor Executor) Option {
- return func(c *localCache) {
- c.exec = executor
- }
- }
- // withInsertionListener is used for testing.
- func withInsertionListener(onInsertion Func) Option {
- return func(c *localCache) {
- c.onInsertion = onInsertion
- }
- }
|