persistence.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package persistence
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/db"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  9. )
  10. type Persistence struct {
  11. maxStorageSize int64
  12. storage PersistenceStorage
  13. }
  14. const (
  15. CACHE_KEY_PREFIX = "persistence:cache"
  16. )
  17. func (c *Persistence) getCacheKey(tenantId string, pluginId string, key string) string {
  18. return fmt.Sprintf("%s:%s:%s:%s", CACHE_KEY_PREFIX, tenantId, pluginId, key)
  19. }
  20. func (c *Persistence) Save(tenantId string, pluginId string, maxSize int64, key string, data []byte) error {
  21. if len(key) > 256 {
  22. return fmt.Errorf("key length must be less than 256 characters")
  23. }
  24. if maxSize == -1 {
  25. maxSize = c.maxStorageSize
  26. }
  27. if err := c.storage.Save(tenantId, pluginId, key, data); err != nil {
  28. return err
  29. }
  30. allocatedSize := int64(len(data))
  31. storage, err := db.GetOne[models.TenantStorage](
  32. db.Equal("tenant_id", tenantId),
  33. db.Equal("plugin_id", pluginId),
  34. )
  35. if err != nil {
  36. if allocatedSize > c.maxStorageSize || allocatedSize > maxSize {
  37. return fmt.Errorf("allocated size is greater than max storage size")
  38. }
  39. if err == db.ErrDatabaseNotFound {
  40. storage = models.TenantStorage{
  41. TenantID: tenantId,
  42. PluginID: pluginId,
  43. Size: allocatedSize,
  44. }
  45. if err := db.Create(&storage); err != nil {
  46. return err
  47. }
  48. } else {
  49. return err
  50. }
  51. } else {
  52. if allocatedSize+storage.Size > maxSize || allocatedSize+storage.Size > c.maxStorageSize {
  53. return fmt.Errorf("allocated size is greater than max storage size")
  54. }
  55. err = db.Run(
  56. db.Model(&models.TenantStorage{}),
  57. db.Equal("tenant_id", tenantId),
  58. db.Equal("plugin_id", pluginId),
  59. db.Inc(map[string]int64{"size": allocatedSize}),
  60. )
  61. if err != nil {
  62. return err
  63. }
  64. }
  65. // delete from cache
  66. return cache.Del(c.getCacheKey(tenantId, pluginId, key))
  67. }
  68. // TODO: raises specific error to avoid confusion
  69. func (c *Persistence) Load(tenantId string, pluginId string, key string) ([]byte, error) {
  70. // check if the key exists in cache
  71. h, err := cache.GetString(c.getCacheKey(tenantId, pluginId, key))
  72. if err != nil && err != cache.ErrNotFound {
  73. return nil, err
  74. }
  75. if err == nil {
  76. return hex.DecodeString(h)
  77. }
  78. // load from storage
  79. data, err := c.storage.Load(tenantId, pluginId, key)
  80. if err != nil {
  81. return nil, err
  82. }
  83. // add to cache
  84. cache.Store(c.getCacheKey(tenantId, pluginId, key), hex.EncodeToString(data), time.Minute*5)
  85. return data, nil
  86. }
  87. func (c *Persistence) Delete(tenantId string, pluginId string, key string) error {
  88. // delete from cache and storage
  89. err := cache.Del(c.getCacheKey(tenantId, pluginId, key))
  90. if err != nil {
  91. return err
  92. }
  93. // state size
  94. size, err := c.storage.StateSize(tenantId, pluginId, key)
  95. if err != nil {
  96. return nil
  97. }
  98. err = c.storage.Delete(tenantId, pluginId, key)
  99. if err != nil {
  100. return nil
  101. }
  102. // update storage size
  103. err = db.Run(
  104. db.Model(&models.TenantStorage{}),
  105. db.Equal("tenant_id", tenantId),
  106. db.Equal("plugin_id", pluginId),
  107. db.Dec(map[string]int64{"size": size}),
  108. )
  109. if err != nil {
  110. return err
  111. }
  112. return nil
  113. }