redis.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package cache
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "time"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/redis/go-redis/v9"
  9. )
  10. var (
  11. client *redis.Client
  12. ctx = context.Background()
  13. ErrDBNotInit = errors.New("redis client not init")
  14. )
  15. func InitRedisClient(addr, password string) error {
  16. client = redis.NewClient(&redis.Options{
  17. Addr: addr,
  18. Password: password,
  19. })
  20. if _, err := client.Ping(ctx).Result(); err != nil {
  21. return err
  22. }
  23. return nil
  24. }
  25. func serialKey(keys ...string) string {
  26. return strings.Join(append(
  27. []string{"plugin_daemon"},
  28. keys...,
  29. ), ":")
  30. }
  31. func Store(key string, value any, time time.Duration) error {
  32. if client == nil {
  33. return ErrDBNotInit
  34. }
  35. return client.Set(ctx, serialKey(key), value, time).Err()
  36. }
  37. func Get[T any](key string) (*T, error) {
  38. if client == nil {
  39. return nil, ErrDBNotInit
  40. }
  41. val, err := client.Get(ctx, serialKey(key)).Result()
  42. if err != nil {
  43. return nil, err
  44. }
  45. result, err := parser.UnmarshalJson[T](val)
  46. return &result, err
  47. }
  48. func Del(key string) error {
  49. if client == nil {
  50. return ErrDBNotInit
  51. }
  52. return client.Del(ctx, serialKey(key)).Err()
  53. }
  54. func Exist(key string) (int64, error) {
  55. if client == nil {
  56. return 0, ErrDBNotInit
  57. }
  58. return client.Exists(ctx, serialKey(key)).Result()
  59. }
  60. func Increase(key string) (int64, error) {
  61. if client == nil {
  62. return 0, ErrDBNotInit
  63. }
  64. return client.Incr(ctx, serialKey(key)).Result()
  65. }
  66. func Decrease(key string) (int64, error) {
  67. if client == nil {
  68. return 0, ErrDBNotInit
  69. }
  70. return client.Decr(ctx, serialKey(key)).Result()
  71. }
  72. func SetExpire(key string, time time.Duration) error {
  73. if client == nil {
  74. return ErrDBNotInit
  75. }
  76. return client.Expire(ctx, serialKey(key), time).Err()
  77. }
  78. func SetMapField(key string, v map[string]any) error {
  79. if client == nil {
  80. return ErrDBNotInit
  81. }
  82. return client.HMSet(ctx, serialKey(key), v).Err()
  83. }
  84. func SetMapOneField(key string, field string, value any) error {
  85. if client == nil {
  86. return ErrDBNotInit
  87. }
  88. return client.HSet(ctx, serialKey(key), field, value).Err()
  89. }
  90. func GetMapField[T any](key string, field string) (*T, error) {
  91. if client == nil {
  92. return nil, ErrDBNotInit
  93. }
  94. val, err := client.HGet(ctx, serialKey(key), field).Result()
  95. if err != nil {
  96. return nil, err
  97. }
  98. result, err := parser.UnmarshalJson[T](val)
  99. return &result, err
  100. }
  101. func DelMapField(key string, field string) error {
  102. if client == nil {
  103. return ErrDBNotInit
  104. }
  105. return client.HDel(ctx, serialKey(key), field).Err()
  106. }
  107. func GetMap[V any](key string) (map[string]V, error) {
  108. if client == nil {
  109. return nil, ErrDBNotInit
  110. }
  111. val, err := client.HGetAll(ctx, serialKey(key)).Result()
  112. if err != nil {
  113. return nil, err
  114. }
  115. result := make(map[string]V)
  116. for k, v := range val {
  117. value, err := parser.UnmarshalJson[V](v)
  118. if err != nil {
  119. return nil, err
  120. }
  121. result[k] = value
  122. }
  123. return result, nil
  124. }
  125. var (
  126. ErrLockTimeout = errors.New("lock timeout")
  127. )
  128. // Lock key, expire time takes responsibility for expiration time
  129. // try_lock_timeout takes responsibility for the timeout of trying to lock
  130. func Lock(key string, expire time.Duration, try_lock_timeout time.Duration) error {
  131. if client == nil {
  132. return ErrDBNotInit
  133. }
  134. const LOCK_DURATION = 20 * time.Millisecond
  135. ticker := time.NewTicker(LOCK_DURATION)
  136. defer ticker.Stop()
  137. for range ticker.C {
  138. if _, err := client.SetNX(ctx, serialKey(key), "1", expire).Result(); err == nil {
  139. return nil
  140. }
  141. try_lock_timeout -= LOCK_DURATION
  142. if try_lock_timeout <= 0 {
  143. return ErrLockTimeout
  144. }
  145. }
  146. return nil
  147. }
  148. func Unlock(key string) error {
  149. if client == nil {
  150. return ErrDBNotInit
  151. }
  152. return client.Del(ctx, serialKey(key)).Err()
  153. }