decaymap.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package decaymap
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. func Zilch[T any]() T {
  7. var zero T
  8. return zero
  9. }
  10. // Impl is a lazy key->value map. It's a wrapper around a map and a mutex. If values exceed their time-to-live, they are pruned at Get time.
  11. type Impl[K comparable, V any] struct {
  12. data map[K]decayMapEntry[V]
  13. // deleteCh receives decay-deletion requests from readers.
  14. deleteCh chan deleteReq[K]
  15. // stopCh stops the background cleanup worker.
  16. stopCh chan struct{}
  17. wg sync.WaitGroup
  18. lock sync.RWMutex
  19. }
  20. type decayMapEntry[V any] struct {
  21. Value V
  22. expiry time.Time
  23. }
  24. // deleteReq is a request to remove a key if its expiry timestamp still matches
  25. // the observed one. This prevents racing with concurrent Set updates.
  26. type deleteReq[K comparable] struct {
  27. key K
  28. expiry time.Time
  29. }
  30. // New creates a new DecayMap of key type K and value type V.
  31. //
  32. // Key types must be comparable to work with maps.
  33. func New[K comparable, V any]() *Impl[K, V] {
  34. m := &Impl[K, V]{
  35. data: make(map[K]decayMapEntry[V]),
  36. deleteCh: make(chan deleteReq[K], 1024),
  37. stopCh: make(chan struct{}),
  38. }
  39. m.wg.Add(1)
  40. go m.cleanupWorker()
  41. return m
  42. }
  43. // expire forcibly expires a key by setting its time-to-live one second in the past.
  44. func (m *Impl[K, V]) expire(key K) bool {
  45. // Use a single write lock to avoid RUnlock->Lock convoy.
  46. m.lock.Lock()
  47. defer m.lock.Unlock()
  48. val, ok := m.data[key]
  49. if !ok {
  50. return false
  51. }
  52. val.expiry = time.Now().Add(-1 * time.Second)
  53. m.data[key] = val
  54. return true
  55. }
  56. // Delete a value from the DecayMap by key.
  57. //
  58. // If the value does not exist, return false. Return true after
  59. // deletion.
  60. func (m *Impl[K, V]) Delete(key K) bool {
  61. // Use a single write lock to avoid RUnlock->Lock convoy.
  62. m.lock.Lock()
  63. defer m.lock.Unlock()
  64. _, ok := m.data[key]
  65. if ok {
  66. delete(m.data, key)
  67. }
  68. return ok
  69. }
  70. // Get gets a value from the DecayMap by key.
  71. //
  72. // If a value has expired, forcibly delete it if it was not updated.
  73. func (m *Impl[K, V]) Get(key K) (V, bool) {
  74. m.lock.RLock()
  75. value, ok := m.data[key]
  76. m.lock.RUnlock()
  77. if !ok {
  78. return Zilch[V](), false
  79. }
  80. if time.Now().After(value.expiry) {
  81. // Defer decay deletion to the background worker to avoid convoy.
  82. select {
  83. case m.deleteCh <- deleteReq[K]{key: key, expiry: value.expiry}:
  84. default:
  85. // Channel full: drop request; a future Cleanup() or Get will retry.
  86. }
  87. return Zilch[V](), false
  88. }
  89. return value.Value, true
  90. }
  91. // Set sets a key value pair in the map.
  92. func (m *Impl[K, V]) Set(key K, value V, ttl time.Duration) {
  93. m.lock.Lock()
  94. defer m.lock.Unlock()
  95. m.data[key] = decayMapEntry[V]{
  96. Value: value,
  97. expiry: time.Now().Add(ttl),
  98. }
  99. }
  100. // Cleanup removes all expired entries from the DecayMap.
  101. func (m *Impl[K, V]) Cleanup() {
  102. m.lock.Lock()
  103. defer m.lock.Unlock()
  104. now := time.Now()
  105. for key, entry := range m.data {
  106. if now.After(entry.expiry) {
  107. delete(m.data, key)
  108. }
  109. }
  110. }
  111. // Len returns the number of entries in the DecayMap.
  112. func (m *Impl[K, V]) Len() int {
  113. m.lock.RLock()
  114. defer m.lock.RUnlock()
  115. return len(m.data)
  116. }
  117. // Close stops the background cleanup worker. It's optional to call; maps live
  118. // for the process lifetime in many cases. Call in tests or when you know you no
  119. // longer need the map to avoid goroutine leaks.
  120. func (m *Impl[K, V]) Close() {
  121. close(m.stopCh)
  122. m.wg.Wait()
  123. }
  124. // cleanupWorker batches decay deletions to minimize lock contention.
  125. func (m *Impl[K, V]) cleanupWorker() {
  126. defer m.wg.Done()
  127. batch := make([]deleteReq[K], 0, 64)
  128. ticker := time.NewTicker(500 * time.Millisecond)
  129. defer ticker.Stop()
  130. flush := func() {
  131. if len(batch) == 0 {
  132. return
  133. }
  134. m.applyDeletes(batch)
  135. // reset batch without reallocating
  136. batch = batch[:0]
  137. }
  138. for {
  139. select {
  140. case req := <-m.deleteCh:
  141. batch = append(batch, req)
  142. case <-ticker.C:
  143. flush()
  144. case <-m.stopCh:
  145. // Drain any remaining requests then exit
  146. for {
  147. select {
  148. case req := <-m.deleteCh:
  149. batch = append(batch, req)
  150. default:
  151. flush()
  152. return
  153. }
  154. }
  155. }
  156. }
  157. }
  158. func (m *Impl[K, V]) applyDeletes(batch []deleteReq[K]) {
  159. now := time.Now()
  160. m.lock.Lock()
  161. for _, req := range batch {
  162. entry, ok := m.data[req.key]
  163. if !ok {
  164. continue
  165. }
  166. // Only delete if the expiry is unchanged and already past.
  167. if entry.expiry.Equal(req.expiry) && now.After(entry.expiry) {
  168. delete(m.data, req.key)
  169. }
  170. }
  171. m.lock.Unlock()
  172. }