| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package bbolt
- import (
- "context"
- "errors"
- "fmt"
- "log/slog"
- "time"
- "github.com/TecharoHQ/anubis/lib/store"
- "go.etcd.io/bbolt"
- )
- // Sentinel error value used for testing and in admin-visible error messages.
- var (
- ErrNotExists = errors.New("bbolt: value does not exist in store")
- )
- // Store implements store.Interface backed by bbolt[1].
- //
- // In essence, bbolt is a hierarchical key/value store with a twist: every value
- // needs to belong to a bucket. Buckets can contain an infinite number of
- // buckets. As such, Anubis nests values in buckets. Each value in the store
- // is given its own bucket with two keys:
- //
- // 1. data - The raw data, usually in JSON
- // 2. expiry - The expiry time formatted as a time.RFC3339Nano timestamp string
- //
- // When Anubis stores a new bit of data, it creates a new bucket for that value.
- // This allows the cleanup phase to iterate over every bucket in the database and
- // only scan the expiry times without having to decode the entire record.
- //
- // bbolt is not suitable for environments where multiple instance of Anubis need
- // to read from and write to the same backend store. For that, use the valkey
- // storage backend.
- //
- // [1]: https://github.com/etcd-io/bbolt
- type Store struct {
- bdb *bbolt.DB
- }
- // Delete a key from the datastore. If the key does not exist, return an error.
- func (s *Store) Delete(ctx context.Context, key string) error {
- return s.bdb.Update(func(tx *bbolt.Tx) error {
- if tx.Bucket([]byte(key)) == nil {
- return fmt.Errorf("%w: %q", ErrNotExists, key)
- }
- return tx.DeleteBucket([]byte(key))
- })
- }
- // Get a value from the datastore.
- //
- // Because each value is stored in its own bucket with data and expiry keys,
- // two get operations are required:
- //
- // 1. Get the expiry key, parse as time.RFC3339Nano. If the key has expired, run deletion in the background and return a "key not found" error.
- // 2. Get the data key, copy into the result byteslice, return it.
- func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
- var result []byte
- if err := s.bdb.View(func(tx *bbolt.Tx) error {
- itemBucket := tx.Bucket([]byte(key))
- if itemBucket == nil {
- return fmt.Errorf("%w: %q", store.ErrNotFound, key)
- }
- expiryStr := itemBucket.Get([]byte("expiry"))
- if expiryStr == nil {
- return fmt.Errorf("[unexpected] %w: %q (expiry is nil)", store.ErrNotFound, key)
- }
- expiry, err := time.Parse(time.RFC3339Nano, string(expiryStr))
- if err != nil {
- return fmt.Errorf("[unexpected] %w: %w", store.ErrCantDecode, err)
- }
- if time.Now().After(expiry) {
- go s.Delete(context.Background(), key)
- return fmt.Errorf("%w: %q", store.ErrNotFound, key)
- }
- dataStr := itemBucket.Get([]byte("data"))
- if dataStr == nil {
- return fmt.Errorf("[unexpected] %w: %q (data is nil)", store.ErrNotFound, key)
- }
- result = make([]byte, len(dataStr))
- if n := copy(result, dataStr); n != len(dataStr) {
- return fmt.Errorf("[unexpected] %w: %d bytes copied of %d", store.ErrCantDecode, n, len(dataStr))
- }
- return nil
- }); err != nil {
- return nil, err
- }
- return result, nil
- }
- // Set a value into the store with a given expiry.
- func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
- expires := time.Now().Add(expiry)
- return s.bdb.Update(func(tx *bbolt.Tx) error {
- valueBkt, err := tx.CreateBucketIfNotExists([]byte(key))
- if err != nil {
- return fmt.Errorf("%w: %w: %q (create bucket)", store.ErrCantEncode, err, key)
- }
- if err := valueBkt.Put([]byte("expiry"), []byte(expires.Format(time.RFC3339Nano))); err != nil {
- return fmt.Errorf("%w: %q (expiry)", store.ErrCantEncode, key)
- }
- if err := valueBkt.Put([]byte("data"), value); err != nil {
- return fmt.Errorf("%w: %q (data)", store.ErrCantEncode, key)
- }
- return nil
- })
- }
- func (s *Store) cleanup(ctx context.Context) error {
- now := time.Now()
- return s.bdb.Update(func(tx *bbolt.Tx) error {
- return tx.ForEach(func(key []byte, valueBkt *bbolt.Bucket) error {
- var expiry time.Time
- var err error
- expiryStr := valueBkt.Get([]byte("expiry"))
- if expiryStr == nil {
- slog.Warn("while running cleanup, expiry is not set somehow, file a bug?", "key", string(key))
- return nil
- }
- expiry, err = time.Parse(time.RFC3339Nano, string(expiryStr))
- if err != nil {
- return fmt.Errorf("[unexpected] %w in bucket %q: %w", store.ErrCantDecode, string(key), err)
- }
- if now.After(expiry) {
- return tx.DeleteBucket(key)
- }
- return nil
- })
- })
- }
- func (s *Store) IsPersistent() bool {
- return true
- }
- func (s *Store) cleanupThread(ctx context.Context) {
- t := time.NewTicker(time.Hour)
- defer t.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-t.C:
- if err := s.cleanup(ctx); err != nil {
- slog.Error("error during bbolt cleanup", "err", err)
- }
- }
- }
- }
|