persistence.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package persistence
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/db"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  9. )
  10. type Persistence struct {
  11. maxStorageSize int64
  12. storage PersistenceStorage
  13. }
  14. const (
  15. CACHE_KEY_PREFIX = "persistence:cache"
  16. )
  17. func (c *Persistence) getCacheKey(tenantId string, pluginId string, key string) string {
  18. return fmt.Sprintf("%s:%s:%s:%s", CACHE_KEY_PREFIX, tenantId, pluginId, key)
  19. }
  20. func (c *Persistence) Save(tenantId string, pluginId string, maxSize int64, key string, data []byte) error {
  21. if len(key) > 256 {
  22. return fmt.Errorf("key length must be less than 256 characters")
  23. }
  24. if maxSize == -1 {
  25. maxSize = c.maxStorageSize
  26. }
  27. if err := c.storage.Save(tenantId, pluginId, key, data); err != nil {
  28. return err
  29. }
  30. allocatedSize := int64(len(data))
  31. storage, err := db.GetOne[models.TenantStorage](
  32. db.Equal("tenant_id", tenantId),
  33. db.Equal("plugin_id", pluginId),
  34. )
  35. if err != nil {
  36. if allocatedSize > c.maxStorageSize || allocatedSize > maxSize {
  37. return fmt.Errorf("allocated size is greater than max storage size")
  38. }
  39. if err == db.ErrDatabaseNotFound {
  40. storage = models.TenantStorage{
  41. TenantID: tenantId,
  42. PluginID: pluginId,
  43. Size: allocatedSize,
  44. }
  45. if err := db.Create(&storage); err != nil {
  46. return err
  47. }
  48. } else {
  49. return err
  50. }
  51. } else {
  52. if allocatedSize+storage.Size > maxSize || allocatedSize+storage.Size > c.maxStorageSize {
  53. return fmt.Errorf("allocated size is greater than max storage size")
  54. }
  55. err = db.Run(
  56. db.Model(&models.TenantStorage{}),
  57. db.Equal("tenant_id", tenantId),
  58. db.Equal("plugin_id", pluginId),
  59. db.Inc(map[string]int64{"size": allocatedSize}),
  60. )
  61. if err != nil {
  62. return err
  63. }
  64. }
  65. // delete from cache
  66. return cache.Del(c.getCacheKey(tenantId, pluginId, key))
  67. }
  68. func (c *Persistence) Load(tenantId string, pluginId string, key string) ([]byte, error) {
  69. // check if the key exists in cache
  70. h, err := cache.GetString(c.getCacheKey(tenantId, pluginId, key))
  71. if err != nil && err != cache.ErrNotFound {
  72. return nil, err
  73. }
  74. if err == nil {
  75. return hex.DecodeString(h)
  76. }
  77. // load from storage
  78. data, err := c.storage.Load(tenantId, pluginId, key)
  79. if err != nil {
  80. return nil, err
  81. }
  82. // add to cache
  83. cache.Store(c.getCacheKey(tenantId, pluginId, key), hex.EncodeToString(data), time.Minute*5)
  84. return data, nil
  85. }
  86. func (c *Persistence) Delete(tenantId string, pluginId string, key string) error {
  87. // delete from cache and storage
  88. err := cache.Del(c.getCacheKey(tenantId, pluginId, key))
  89. if err != nil {
  90. return err
  91. }
  92. // state size
  93. size, err := c.storage.StateSize(tenantId, pluginId, key)
  94. if err != nil {
  95. return nil
  96. }
  97. err = c.storage.Delete(tenantId, pluginId, key)
  98. if err != nil {
  99. return nil
  100. }
  101. // update storage size
  102. err = db.Run(
  103. db.Model(&models.TenantStorage{}),
  104. db.Equal("tenant_id", tenantId),
  105. db.Equal("plugin_id", pluginId),
  106. db.Dec(map[string]int64{"size": size}),
  107. )
  108. if err != nil {
  109. return err
  110. }
  111. return nil
  112. }