persistence.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package persistence
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/db"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  9. )
  10. type Persistence struct {
  11. max_storage_size int64
  12. storage PersistenceStorage
  13. }
  14. const (
  15. CACHE_KEY_PREFIX = "persistence:cache"
  16. )
  17. func (c *Persistence) getCacheKey(tenant_id string, plugin_id string, key string) string {
  18. return fmt.Sprintf("%s:%s:%s:%s", CACHE_KEY_PREFIX, tenant_id, plugin_id, key)
  19. }
  20. func (c *Persistence) Save(tenant_id string, plugin_id string, max_size int64, key string, data []byte) error {
  21. if len(key) > 64 {
  22. return fmt.Errorf("key length must be less than 64 characters")
  23. }
  24. if max_size == -1 {
  25. max_size = c.max_storage_size
  26. }
  27. if err := c.storage.Save(tenant_id, plugin_id, key, data); err != nil {
  28. return err
  29. }
  30. allocated_size := int64(len(data))
  31. storage, err := db.GetOne[models.TenantStorage](
  32. db.Equal("tenant_id", tenant_id),
  33. db.Equal("plugin_id", plugin_id),
  34. )
  35. if err != nil {
  36. if allocated_size > c.max_storage_size || allocated_size > max_size {
  37. return fmt.Errorf("allocated size is greater than max storage size")
  38. }
  39. if err == db.ErrDatabaseNotFound {
  40. storage = models.TenantStorage{
  41. TenantID: tenant_id,
  42. PluginID: plugin_id,
  43. Size: allocated_size,
  44. }
  45. if err := db.Create(&storage); err != nil {
  46. return err
  47. }
  48. } else {
  49. return err
  50. }
  51. } else {
  52. if allocated_size+storage.Size > max_size || allocated_size+storage.Size > c.max_storage_size {
  53. return fmt.Errorf("allocated size is greater than max storage size")
  54. }
  55. err = db.Run(
  56. db.Model(&models.TenantStorage{}),
  57. db.Equal("tenant_id", tenant_id),
  58. db.Equal("plugin_id", plugin_id),
  59. db.Inc(map[string]int64{"size": allocated_size}),
  60. )
  61. if err != nil {
  62. return err
  63. }
  64. }
  65. // delete from cache
  66. return cache.Del(c.getCacheKey(tenant_id, plugin_id, key))
  67. }
  68. func (c *Persistence) Load(tenant_id string, plugin_id string, key string) ([]byte, error) {
  69. // check if the key exists in cache
  70. h, err := cache.GetString(c.getCacheKey(tenant_id, plugin_id, key))
  71. if err != nil && err != cache.ErrNotFound {
  72. return nil, err
  73. }
  74. if err == nil {
  75. return hex.DecodeString(h)
  76. }
  77. // load from storage
  78. data, err := c.storage.Load(tenant_id, plugin_id, key)
  79. if err != nil {
  80. return nil, err
  81. }
  82. // add to cache
  83. cache.Store(c.getCacheKey(tenant_id, plugin_id, key), hex.EncodeToString(data), time.Minute*5)
  84. return data, nil
  85. }
  86. func (c *Persistence) Delete(tenant_id string, plugin_id string, key string) error {
  87. // delete from cache and storage
  88. err := cache.Del(c.getCacheKey(tenant_id, plugin_id, key))
  89. if err != nil {
  90. return err
  91. }
  92. // state size
  93. size, err := c.storage.StateSize(tenant_id, plugin_id, key)
  94. if err != nil {
  95. return nil
  96. }
  97. err = c.storage.Delete(tenant_id, plugin_id, key)
  98. if err != nil {
  99. return nil
  100. }
  101. // update storage size
  102. err = db.Run(
  103. db.Model(&models.TenantStorage{}),
  104. db.Equal("tenant_id", tenant_id),
  105. db.Equal("plugin_id", plugin_id),
  106. db.Dec(map[string]int64{"size": size}),
  107. )
  108. if err != nil {
  109. return err
  110. }
  111. return nil
  112. }