factory.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package valkey
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "time"
  8. "github.com/TecharoHQ/anubis/internal"
  9. "github.com/TecharoHQ/anubis/lib/store"
  10. valkey "github.com/redis/go-redis/v9"
  11. "github.com/redis/go-redis/v9/maintnotifications"
  12. )
  13. func init() {
  14. store.Register("valkey", Factory{})
  15. }
  16. var (
  17. ErrNoURL = errors.New("valkey.Config: no URL defined")
  18. ErrBadURL = errors.New("valkey.Config: URL is invalid")
  19. // Sentinel validation errors
  20. ErrSentinelMasterNameRequired = errors.New("valkey.Sentinel: masterName is required")
  21. ErrSentinelAddrRequired = errors.New("valkey.Sentinel: addr is required")
  22. ErrSentinelAddrEmpty = errors.New("valkey.Sentinel: addr cannot be empty")
  23. )
  24. // Config is what Anubis unmarshals from the "parameters" JSON.
  25. type Config struct {
  26. URL string `json:"url"`
  27. Cluster bool `json:"cluster,omitempty"`
  28. Sentinel *Sentinel `json:"sentinel,omitempty"`
  29. }
  30. func (c Config) Valid() error {
  31. var errs []error
  32. if c.URL == "" && c.Sentinel == nil {
  33. errs = append(errs, ErrNoURL)
  34. }
  35. // Validate URL only if provided
  36. if c.URL != "" {
  37. if _, err := valkey.ParseURL(c.URL); err != nil {
  38. errs = append(errs, fmt.Errorf("%w: %v", ErrBadURL, err))
  39. }
  40. }
  41. if c.Sentinel != nil {
  42. if err := c.Sentinel.Valid(); err != nil {
  43. errs = append(errs, err)
  44. }
  45. }
  46. if len(errs) > 0 {
  47. return errors.Join(errs...)
  48. }
  49. return nil
  50. }
  51. type Sentinel struct {
  52. MasterName string `json:"masterName"`
  53. Addr internal.ListOr[string] `json:"addr"`
  54. ClientName string `json:"clientName,omitempty"`
  55. Username string `json:"username,omitempty"`
  56. Password string `json:"password,omitempty"`
  57. }
  58. func (s Sentinel) Valid() error {
  59. var errs []error
  60. if s.MasterName == "" {
  61. errs = append(errs, ErrSentinelMasterNameRequired)
  62. }
  63. if len(s.Addr) == 0 {
  64. errs = append(errs, ErrSentinelAddrRequired)
  65. } else {
  66. // Check if all addresses in the list are empty
  67. allEmpty := true
  68. for _, addr := range s.Addr {
  69. if addr != "" {
  70. allEmpty = false
  71. break
  72. }
  73. }
  74. if allEmpty {
  75. errs = append(errs, ErrSentinelAddrEmpty)
  76. }
  77. }
  78. if len(errs) > 0 {
  79. return errors.Join(errs...)
  80. }
  81. return nil
  82. }
  83. // redisClient is satisfied by *valkey.Client and *valkey.ClusterClient.
  84. type redisClient interface {
  85. Get(ctx context.Context, key string) *valkey.StringCmd
  86. Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *valkey.StatusCmd
  87. Del(ctx context.Context, keys ...string) *valkey.IntCmd
  88. Ping(ctx context.Context) *valkey.StatusCmd
  89. }
  90. type Factory struct{}
  91. func (Factory) Valid(data json.RawMessage) error {
  92. var cfg Config
  93. if err := json.Unmarshal(data, &cfg); err != nil {
  94. return err
  95. }
  96. return cfg.Valid()
  97. }
  98. func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
  99. var cfg Config
  100. if err := json.Unmarshal(data, &cfg); err != nil {
  101. return nil, err
  102. }
  103. if err := cfg.Valid(); err != nil {
  104. return nil, err
  105. }
  106. var client redisClient
  107. switch {
  108. case cfg.Cluster:
  109. opts, err := valkey.ParseURL(cfg.URL)
  110. if err != nil {
  111. return nil, fmt.Errorf("valkey.Factory: %w", err)
  112. }
  113. // Cluster mode: use the parsed Addr as the seed node.
  114. clusterOpts := &valkey.ClusterOptions{
  115. Addrs: []string{opts.Addr},
  116. // Explicitly disable maintenance notifications
  117. // This prevents the client from sending CLIENT MAINT_NOTIFICATIONS ON
  118. MaintNotificationsConfig: &maintnotifications.Config{
  119. Mode: maintnotifications.ModeDisabled,
  120. },
  121. }
  122. client = valkey.NewClusterClient(clusterOpts)
  123. case cfg.Sentinel != nil:
  124. opts := &valkey.FailoverOptions{
  125. MasterName: cfg.Sentinel.MasterName,
  126. SentinelAddrs: cfg.Sentinel.Addr,
  127. SentinelUsername: cfg.Sentinel.Username,
  128. SentinelPassword: cfg.Sentinel.Password,
  129. Username: cfg.Sentinel.Username,
  130. Password: cfg.Sentinel.Password,
  131. ClientName: cfg.Sentinel.ClientName,
  132. }
  133. client = valkey.NewFailoverClusterClient(opts)
  134. default:
  135. opts, err := valkey.ParseURL(cfg.URL)
  136. if err != nil {
  137. return nil, fmt.Errorf("valkey.Factory: %w", err)
  138. }
  139. opts.MaintNotificationsConfig = &maintnotifications.Config{
  140. Mode: maintnotifications.ModeDisabled,
  141. }
  142. client = valkey.NewClient(opts)
  143. }
  144. // Optional but nice: fail fast if the cluster/single node is unreachable.
  145. if err := client.Ping(ctx).Err(); err != nil {
  146. return nil, fmt.Errorf("valkey.Factory: ping failed: %w", err)
  147. }
  148. return &Store{client: client}, nil
  149. }