local.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. package cache
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. const (
  8. // Default maximum number of cache entries.
  9. maximumCapacity = 1 << 30
  10. // Buffer size of entry channels
  11. chanBufSize = 64
  12. // Maximum number of entries to be drained in a single clean up.
  13. drainMax = 16
  14. // Number of cache access operations that will trigger clean up.
  15. drainThreshold = 64
  16. )
  17. // currentTime is an alias for time.Now, used for testing.
  18. var currentTime = time.Now
  19. // localCache is an asynchronous LRU cache.
  20. type localCache struct {
  21. // internal data structure
  22. cache cache // Must be aligned on 32-bit
  23. // user configurations
  24. expireAfterAccess time.Duration
  25. expireAfterWrite time.Duration
  26. refreshAfterWrite time.Duration
  27. policyName string
  28. onInsertion Func
  29. onRemoval Func
  30. loader LoaderFunc
  31. exec Executor
  32. stats StatsCounter
  33. // cap is the cache capacity.
  34. cap int
  35. // accessQueue is the cache retention policy, which manages entries by access time.
  36. accessQueue policy
  37. // writeQueue is for managing entries by write time.
  38. // It is only fulfilled when expireAfterWrite or refreshAfterWrite is set.
  39. writeQueue policy
  40. // events is the cache event queue for processEntries
  41. events chan entryEvent
  42. // readCount is a counter of the number of reads since the last write.
  43. readCount int32
  44. // for closing routines created by this cache.
  45. closing int32
  46. closeWG sync.WaitGroup
  47. }
  48. // newLocalCache returns a default localCache.
  49. // init must be called before this cache can be used.
  50. func newLocalCache() *localCache {
  51. return &localCache{
  52. cap: maximumCapacity,
  53. cache: cache{},
  54. stats: &statsCounter{},
  55. }
  56. }
  57. // init initializes cache replacement policy after all user configuration properties are set.
  58. func (c *localCache) init() {
  59. c.accessQueue = newPolicy(c.policyName)
  60. c.accessQueue.init(&c.cache, c.cap)
  61. if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 {
  62. c.writeQueue = &recencyQueue{}
  63. } else {
  64. c.writeQueue = discardingQueue{}
  65. }
  66. c.writeQueue.init(&c.cache, c.cap)
  67. c.events = make(chan entryEvent, chanBufSize)
  68. c.closeWG.Add(1)
  69. go c.processEntries()
  70. }
  71. // Close implements io.Closer and always returns a nil error.
  72. // Caller would ensure the cache is not being used (reading and writing) before closing.
  73. func (c *localCache) Close() error {
  74. if atomic.CompareAndSwapInt32(&c.closing, 0, 1) {
  75. // Do not close events channel to avoid panic when cache is still being used.
  76. c.events <- entryEvent{nil, eventClose}
  77. // Wait for the goroutine to close this channel
  78. c.closeWG.Wait()
  79. }
  80. return nil
  81. }
  82. // GetIfPresent gets cached value from entries list and updates
  83. // last access time for the entry if it is found.
  84. func (c *localCache) GetIfPresent(k Key) (Value, bool) {
  85. en := c.cache.get(k, sum(k))
  86. if en == nil {
  87. c.stats.RecordMisses(1)
  88. return nil, false
  89. }
  90. now := currentTime()
  91. if c.isExpired(en, now) {
  92. c.stats.RecordMisses(1)
  93. c.sendEvent(eventDelete, en)
  94. return nil, false
  95. }
  96. c.stats.RecordHits(1)
  97. c.setEntryAccessTime(en, now)
  98. c.sendEvent(eventAccess, en)
  99. return en.getValue(), true
  100. }
  101. // Put adds new entry to entries list.
  102. func (c *localCache) Put(k Key, v Value) {
  103. h := sum(k)
  104. en := c.cache.get(k, h)
  105. now := currentTime()
  106. if en == nil {
  107. en = newEntry(k, v, h)
  108. c.setEntryWriteTime(en, now)
  109. c.setEntryAccessTime(en, now)
  110. // Add to the cache directly so the new value is available immediately.
  111. // However, only do this within the cache capacity (approximately).
  112. if c.cap == 0 || c.cache.len() < c.cap {
  113. cen := c.cache.getOrSet(en)
  114. if cen != nil {
  115. cen.setValue(v)
  116. en = cen
  117. }
  118. }
  119. } else {
  120. // Update value and send notice
  121. en.setValue(v)
  122. en.setWriteTime(now.UnixNano())
  123. }
  124. c.sendEvent(eventWrite, en)
  125. }
  126. // Invalidate removes the entry associated with key k.
  127. func (c *localCache) Invalidate(k Key) {
  128. en := c.cache.get(k, sum(k))
  129. if en != nil {
  130. en.setInvalidated(true)
  131. c.sendEvent(eventDelete, en)
  132. }
  133. }
  134. // InvalidateAll resets entries list.
  135. func (c *localCache) InvalidateAll() {
  136. c.cache.walk(func(en *entry) {
  137. en.setInvalidated(true)
  138. })
  139. c.sendEvent(eventDelete, nil)
  140. }
  141. // Get returns value associated with k or call underlying loader to retrieve value
  142. // if it is not in the cache. The returned value is only cached when loader returns
  143. // nil error.
  144. func (c *localCache) Get(k Key) (Value, error) {
  145. en := c.cache.get(k, sum(k))
  146. if en == nil {
  147. c.stats.RecordMisses(1)
  148. return c.load(k)
  149. }
  150. // Check if this entry needs to be refreshed
  151. now := currentTime()
  152. if c.isExpired(en, now) {
  153. c.stats.RecordMisses(1)
  154. if c.loader == nil {
  155. c.sendEvent(eventDelete, en)
  156. } else {
  157. // For loading cache, we do not delete entry but leave it to
  158. // the eviction policy, so users still can get the old value.
  159. c.setEntryAccessTime(en, now)
  160. c.refreshAsync(en)
  161. }
  162. } else {
  163. c.stats.RecordHits(1)
  164. c.setEntryAccessTime(en, now)
  165. c.sendEvent(eventAccess, en)
  166. }
  167. return en.getValue(), nil
  168. }
  169. // Refresh asynchronously reloads value for Key if it existed, otherwise
  170. // it will synchronously load and block until it value is loaded.
  171. func (c *localCache) Refresh(k Key) {
  172. if c.loader == nil {
  173. return
  174. }
  175. en := c.cache.get(k, sum(k))
  176. if en == nil {
  177. c.load(k)
  178. } else {
  179. c.refreshAsync(en)
  180. }
  181. }
  182. // Stats copies cache stats to t.
  183. func (c *localCache) Stats(t *Stats) {
  184. c.stats.Snapshot(t)
  185. }
  186. func (c *localCache) processEntries() {
  187. defer c.closeWG.Done()
  188. for e := range c.events {
  189. switch e.event {
  190. case eventWrite:
  191. c.write(e.entry)
  192. c.postWriteCleanup()
  193. case eventAccess:
  194. c.access(e.entry)
  195. c.postReadCleanup()
  196. case eventDelete:
  197. if e.entry == nil {
  198. c.removeAll()
  199. } else {
  200. c.remove(e.entry)
  201. }
  202. c.postReadCleanup()
  203. case eventClose:
  204. if c.exec != nil {
  205. // Stop all refresh tasks.
  206. c.exec.Close()
  207. }
  208. c.removeAll()
  209. return
  210. }
  211. }
  212. }
  213. // sendEvent sends event only when the cache is not closing/closed.
  214. func (c *localCache) sendEvent(typ event, en *entry) {
  215. if atomic.LoadInt32(&c.closing) == 0 {
  216. c.events <- entryEvent{en, typ}
  217. }
  218. }
  219. // This function must only be called from processEntries goroutine.
  220. func (c *localCache) write(en *entry) {
  221. ren := c.accessQueue.write(en)
  222. c.writeQueue.write(en)
  223. if c.onInsertion != nil {
  224. c.onInsertion(en.key, en.getValue())
  225. }
  226. if ren != nil {
  227. c.writeQueue.remove(ren)
  228. // An entry has been evicted
  229. c.stats.RecordEviction()
  230. if c.onRemoval != nil {
  231. c.onRemoval(ren.key, ren.getValue())
  232. }
  233. }
  234. }
  235. // removeAll remove all entries in the cache.
  236. // This function must only be called from processEntries goroutine.
  237. func (c *localCache) removeAll() {
  238. c.accessQueue.iterate(func(en *entry) bool {
  239. c.remove(en)
  240. return true
  241. })
  242. }
  243. // remove removes the given element from the cache and entries list.
  244. // It also calls onRemoval callback if it is set.
  245. func (c *localCache) remove(en *entry) {
  246. ren := c.accessQueue.remove(en)
  247. c.writeQueue.remove(en)
  248. if ren != nil && c.onRemoval != nil {
  249. c.onRemoval(ren.key, ren.getValue())
  250. }
  251. }
  252. // access moves the given element to the top of the entries list.
  253. // This function must only be called from processEntries goroutine.
  254. func (c *localCache) access(en *entry) {
  255. c.accessQueue.access(en)
  256. }
  257. // load uses current loader to synchronously retrieve value for k and adds new
  258. // entry to the cache only if loader returns a nil error.
  259. func (c *localCache) load(k Key) (Value, error) {
  260. if c.loader == nil {
  261. panic("cache loader function must be set")
  262. }
  263. // TODO: Poll the value instead when the entry is loading.
  264. start := currentTime()
  265. v, err := c.loader(k)
  266. now := currentTime()
  267. loadTime := now.Sub(start)
  268. if err != nil {
  269. c.stats.RecordLoadError(loadTime)
  270. return nil, err
  271. }
  272. c.stats.RecordLoadSuccess(loadTime)
  273. en := newEntry(k, v, sum(k))
  274. c.setEntryWriteTime(en, now)
  275. c.setEntryAccessTime(en, now)
  276. c.sendEvent(eventWrite, en)
  277. return v, nil
  278. }
  279. // refreshAsync reloads value in a go routine or using custom executor if defined.
  280. func (c *localCache) refreshAsync(en *entry) bool {
  281. if c.loader == nil {
  282. panic("cache loader function must be set")
  283. }
  284. if en.setLoading(true) {
  285. // Only do refresh if it isn't running.
  286. if c.exec == nil {
  287. go c.refresh(en)
  288. } else {
  289. c.exec.Execute(func() { c.refresh(en) })
  290. }
  291. return true
  292. }
  293. return false
  294. }
  295. // refresh reloads value for the given key. If loader returns an error,
  296. // that error will be omitted. Otherwise, the entry value will be updated.
  297. // This function would only be called by refreshAsync.
  298. func (c *localCache) refresh(en *entry) {
  299. defer en.setLoading(false)
  300. start := currentTime()
  301. v, err := c.loader(en.key)
  302. now := currentTime()
  303. loadTime := now.Sub(start)
  304. if err == nil {
  305. c.stats.RecordLoadSuccess(loadTime)
  306. en.setValue(v)
  307. en.setWriteTime(now.UnixNano())
  308. c.sendEvent(eventWrite, en)
  309. } else {
  310. // TODO: Log error
  311. c.stats.RecordLoadError(loadTime)
  312. }
  313. }
  314. // postReadCleanup is run after entry access/delete event.
  315. // This function must only be called from processEntries goroutine.
  316. func (c *localCache) postReadCleanup() {
  317. if atomic.AddInt32(&c.readCount, 1) > drainThreshold {
  318. atomic.StoreInt32(&c.readCount, 0)
  319. c.expireEntries()
  320. }
  321. }
  322. // postWriteCleanup is run after entry add event.
  323. // This function must only be called from processEntries goroutine.
  324. func (c *localCache) postWriteCleanup() {
  325. atomic.StoreInt32(&c.readCount, 0)
  326. c.expireEntries()
  327. }
  328. // expireEntries removes expired entries.
  329. func (c *localCache) expireEntries() {
  330. remain := drainMax
  331. now := currentTime()
  332. if c.expireAfterAccess > 0 {
  333. expiry := now.Add(-c.expireAfterAccess).UnixNano()
  334. c.accessQueue.iterate(func(en *entry) bool {
  335. if remain == 0 || en.getAccessTime() >= expiry {
  336. // Can stop as the entries are sorted by access time.
  337. // (the next entry is accessed more recently.)
  338. return false
  339. }
  340. // accessTime + expiry passed
  341. c.remove(en)
  342. c.stats.RecordEviction()
  343. remain--
  344. return remain > 0
  345. })
  346. }
  347. if remain > 0 && c.expireAfterWrite > 0 {
  348. expiry := now.Add(-c.expireAfterWrite).UnixNano()
  349. c.writeQueue.iterate(func(en *entry) bool {
  350. if remain == 0 || en.getWriteTime() >= expiry {
  351. return false
  352. }
  353. // writeTime + expiry passed
  354. c.remove(en)
  355. c.stats.RecordEviction()
  356. remain--
  357. return remain > 0
  358. })
  359. }
  360. if remain > 0 && c.loader != nil && c.refreshAfterWrite > 0 {
  361. expiry := now.Add(-c.refreshAfterWrite).UnixNano()
  362. c.writeQueue.iterate(func(en *entry) bool {
  363. if remain == 0 || en.getWriteTime() >= expiry {
  364. return false
  365. }
  366. // FIXME: This can cause deadlock if the custom executor runs refresh in current go routine.
  367. // The refresh function, when finish, will send to event channels.
  368. if c.refreshAsync(en) {
  369. // TODO: Maybe move this entry up?
  370. remain--
  371. }
  372. return remain > 0
  373. })
  374. }
  375. }
  376. func (c *localCache) isExpired(en *entry, now time.Time) bool {
  377. if en.getInvalidated() {
  378. return true
  379. }
  380. if c.expireAfterAccess > 0 && en.getAccessTime() < now.Add(-c.expireAfterAccess).UnixNano() {
  381. // accessTime + expiry passed
  382. return true
  383. }
  384. if c.expireAfterWrite > 0 && en.getWriteTime() < now.Add(-c.expireAfterWrite).UnixNano() {
  385. // writeTime + expiry passed
  386. return true
  387. }
  388. return false
  389. }
  390. func (c *localCache) needRefresh(en *entry, now time.Time) bool {
  391. if en.getLoading() {
  392. return false
  393. }
  394. if c.refreshAfterWrite > 0 {
  395. tm := en.getWriteTime()
  396. if tm > 0 && tm < now.Add(-c.refreshAfterWrite).UnixNano() {
  397. // writeTime + refresh passed
  398. return true
  399. }
  400. }
  401. return false
  402. }
  403. // setEntryAccessTime sets access time if needed.
  404. func (c *localCache) setEntryAccessTime(en *entry, now time.Time) {
  405. if c.expireAfterAccess > 0 {
  406. en.setAccessTime(now.UnixNano())
  407. }
  408. }
  409. // setEntryWriteTime sets write time if needed.
  410. func (c *localCache) setEntryWriteTime(en *entry, now time.Time) {
  411. if c.expireAfterWrite > 0 || c.refreshAfterWrite > 0 {
  412. en.setWriteTime(now.UnixNano())
  413. }
  414. }
  415. // New returns a local in-memory Cache.
  416. func New(options ...Option) Cache {
  417. c := newLocalCache()
  418. for _, opt := range options {
  419. opt(c)
  420. }
  421. c.init()
  422. return c
  423. }
  424. // NewLoadingCache returns a new LoadingCache with given loader function
  425. // and cache options.
  426. func NewLoadingCache(loader LoaderFunc, options ...Option) LoadingCache {
  427. c := newLocalCache()
  428. c.loader = loader
  429. for _, opt := range options {
  430. opt(c)
  431. }
  432. c.init()
  433. return c
  434. }
  435. // Option add options for default Cache.
  436. type Option func(c *localCache)
  437. // WithMaximumSize returns an Option which sets maximum size for the cache.
  438. // Any non-positive numbers is considered as unlimited.
  439. func WithMaximumSize(size int) Option {
  440. if size < 0 {
  441. size = 0
  442. }
  443. if size > maximumCapacity {
  444. size = maximumCapacity
  445. }
  446. return func(c *localCache) {
  447. c.cap = size
  448. }
  449. }
  450. // WithRemovalListener returns an Option to set cache to call onRemoval for each
  451. // entry evicted from the cache.
  452. func WithRemovalListener(onRemoval Func) Option {
  453. return func(c *localCache) {
  454. c.onRemoval = onRemoval
  455. }
  456. }
  457. // WithExpireAfterAccess returns an option to expire a cache entry after the
  458. // given duration without being accessed.
  459. func WithExpireAfterAccess(d time.Duration) Option {
  460. return func(c *localCache) {
  461. c.expireAfterAccess = d
  462. }
  463. }
  464. // WithExpireAfterWrite returns an option to expire a cache entry after the
  465. // given duration from creation.
  466. func WithExpireAfterWrite(d time.Duration) Option {
  467. return func(c *localCache) {
  468. c.expireAfterWrite = d
  469. }
  470. }
  471. // WithRefreshAfterWrite returns an option to refresh a cache entry after the
  472. // given duration. This option is only applicable for LoadingCache.
  473. func WithRefreshAfterWrite(d time.Duration) Option {
  474. return func(c *localCache) {
  475. c.refreshAfterWrite = d
  476. }
  477. }
  478. // WithStatsCounter returns an option which overrides default cache stats counter.
  479. func WithStatsCounter(st StatsCounter) Option {
  480. return func(c *localCache) {
  481. c.stats = st
  482. }
  483. }
  484. // WithPolicy returns an option which sets cache policy associated to the given name.
  485. // Supported policies are: lru, slru, tinylfu.
  486. func WithPolicy(name string) Option {
  487. return func(c *localCache) {
  488. c.policyName = name
  489. }
  490. }
  491. // WithExecutor returns an option which sets executor for cache loader.
  492. // By default, each asynchronous reload is run in a go routine.
  493. // This option is only applicable for LoadingCache.
  494. func WithExecutor(executor Executor) Option {
  495. return func(c *localCache) {
  496. c.exec = executor
  497. }
  498. }
  499. // withInsertionListener is used for testing.
  500. func withInsertionListener(onInsertion Func) Option {
  501. return func(c *localCache) {
  502. c.onInsertion = onInsertion
  503. }
  504. }