bbolt.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package bbolt
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log/slog"
  7. "time"
  8. "github.com/TecharoHQ/anubis/lib/store"
  9. "go.etcd.io/bbolt"
  10. )
  11. // Sentinel error value used for testing and in admin-visible error messages.
  12. var (
  13. ErrNotExists = errors.New("bbolt: value does not exist in store")
  14. )
  15. // Store implements store.Interface backed by bbolt[1].
  16. //
  17. // In essence, bbolt is a hierarchical key/value store with a twist: every value
  18. // needs to belong to a bucket. Buckets can contain an infinite number of
  19. // buckets. As such, Anubis nests values in buckets. Each value in the store
  20. // is given its own bucket with two keys:
  21. //
  22. // 1. data - The raw data, usually in JSON
  23. // 2. expiry - The expiry time formatted as a time.RFC3339Nano timestamp string
  24. //
  25. // When Anubis stores a new bit of data, it creates a new bucket for that value.
  26. // This allows the cleanup phase to iterate over every bucket in the database and
  27. // only scan the expiry times without having to decode the entire record.
  28. //
  29. // bbolt is not suitable for environments where multiple instance of Anubis need
  30. // to read from and write to the same backend store. For that, use the valkey
  31. // storage backend.
  32. //
  33. // [1]: https://github.com/etcd-io/bbolt
  34. type Store struct {
  35. bdb *bbolt.DB
  36. }
  37. // Delete a key from the datastore. If the key does not exist, return an error.
  38. func (s *Store) Delete(ctx context.Context, key string) error {
  39. return s.bdb.Update(func(tx *bbolt.Tx) error {
  40. if tx.Bucket([]byte(key)) == nil {
  41. return fmt.Errorf("%w: %q", ErrNotExists, key)
  42. }
  43. return tx.DeleteBucket([]byte(key))
  44. })
  45. }
  46. // Get a value from the datastore.
  47. //
  48. // Because each value is stored in its own bucket with data and expiry keys,
  49. // two get operations are required:
  50. //
  51. // 1. Get the expiry key, parse as time.RFC3339Nano. If the key has expired, run deletion in the background and return a "key not found" error.
  52. // 2. Get the data key, copy into the result byteslice, return it.
  53. func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
  54. var result []byte
  55. if err := s.bdb.View(func(tx *bbolt.Tx) error {
  56. itemBucket := tx.Bucket([]byte(key))
  57. if itemBucket == nil {
  58. return fmt.Errorf("%w: %q", store.ErrNotFound, key)
  59. }
  60. expiryStr := itemBucket.Get([]byte("expiry"))
  61. if expiryStr == nil {
  62. return fmt.Errorf("[unexpected] %w: %q (expiry is nil)", store.ErrNotFound, key)
  63. }
  64. expiry, err := time.Parse(time.RFC3339Nano, string(expiryStr))
  65. if err != nil {
  66. return fmt.Errorf("[unexpected] %w: %w", store.ErrCantDecode, err)
  67. }
  68. if time.Now().After(expiry) {
  69. go s.Delete(context.Background(), key)
  70. return fmt.Errorf("%w: %q", store.ErrNotFound, key)
  71. }
  72. dataStr := itemBucket.Get([]byte("data"))
  73. if dataStr == nil {
  74. return fmt.Errorf("[unexpected] %w: %q (data is nil)", store.ErrNotFound, key)
  75. }
  76. result = make([]byte, len(dataStr))
  77. if n := copy(result, dataStr); n != len(dataStr) {
  78. return fmt.Errorf("[unexpected] %w: %d bytes copied of %d", store.ErrCantDecode, n, len(dataStr))
  79. }
  80. return nil
  81. }); err != nil {
  82. return nil, err
  83. }
  84. return result, nil
  85. }
  86. // Set a value into the store with a given expiry.
  87. func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
  88. expires := time.Now().Add(expiry)
  89. return s.bdb.Update(func(tx *bbolt.Tx) error {
  90. valueBkt, err := tx.CreateBucketIfNotExists([]byte(key))
  91. if err != nil {
  92. return fmt.Errorf("%w: %w: %q (create bucket)", store.ErrCantEncode, err, key)
  93. }
  94. if err := valueBkt.Put([]byte("expiry"), []byte(expires.Format(time.RFC3339Nano))); err != nil {
  95. return fmt.Errorf("%w: %q (expiry)", store.ErrCantEncode, key)
  96. }
  97. if err := valueBkt.Put([]byte("data"), value); err != nil {
  98. return fmt.Errorf("%w: %q (data)", store.ErrCantEncode, key)
  99. }
  100. return nil
  101. })
  102. }
  103. func (s *Store) cleanup(ctx context.Context) error {
  104. now := time.Now()
  105. return s.bdb.Update(func(tx *bbolt.Tx) error {
  106. return tx.ForEach(func(key []byte, valueBkt *bbolt.Bucket) error {
  107. var expiry time.Time
  108. var err error
  109. expiryStr := valueBkt.Get([]byte("expiry"))
  110. if expiryStr == nil {
  111. slog.Warn("while running cleanup, expiry is not set somehow, file a bug?", "key", string(key))
  112. return nil
  113. }
  114. expiry, err = time.Parse(time.RFC3339Nano, string(expiryStr))
  115. if err != nil {
  116. return fmt.Errorf("[unexpected] %w in bucket %q: %w", store.ErrCantDecode, string(key), err)
  117. }
  118. if now.After(expiry) {
  119. return tx.DeleteBucket(key)
  120. }
  121. return nil
  122. })
  123. })
  124. }
  125. func (s *Store) IsPersistent() bool {
  126. return true
  127. }
  128. func (s *Store) cleanupThread(ctx context.Context) {
  129. t := time.NewTicker(time.Hour)
  130. defer t.Stop()
  131. for {
  132. select {
  133. case <-ctx.Done():
  134. return
  135. case <-t.C:
  136. if err := s.cleanup(ctx); err != nil {
  137. slog.Error("error during bbolt cleanup", "err", err)
  138. }
  139. }
  140. }
  141. }