Yeuoly пре 11 месеци
родитељ
комит
4ea1f0b6a2

+ 1 - 0
internal/core/persistence/.gitignore

@@ -0,0 +1 @@
+persistence_storage

+ 11 - 0
internal/core/persistence/local.go

@@ -10,6 +10,11 @@ type LocalWrapper struct {
 }
 
 func NewLocalWrapper(path string) *LocalWrapper {
+	// check if the path exists, create it if not
+	if _, err := os.Stat(path); os.IsNotExist(err) {
+		os.MkdirAll(path, 0755)
+	}
+
 	return &LocalWrapper{
 		path: path,
 	}
@@ -20,6 +25,12 @@ func (l *LocalWrapper) getFilePath(tenant_id string, plugin_checksum string, key
 }
 
 func (l *LocalWrapper) Save(tenant_id string, plugin_checksum string, key string, data []byte) error {
+	// create the directory if it doesn't exist
+	dir := l.getFilePath(tenant_id, plugin_checksum, "")
+	if err := os.MkdirAll(dir, 0755); err != nil {
+		return err
+	}
+
 	file_path := l.getFilePath(tenant_id, plugin_checksum, key)
 	return os.WriteFile(file_path, data, 0644)
 }

+ 19 - 8
internal/core/persistence/persistence.go

@@ -3,6 +3,7 @@ package persistence
 import (
 	"encoding/hex"
 	"fmt"
+	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 )
@@ -15,19 +16,21 @@ const (
 	CACHE_KEY_PREFIX = "persistence:cache"
 )
 
-func (c *Persistence) getCacheKey(tenant_id string, plugin_checksum string) string {
-	return fmt.Sprintf("%s:%s:%s", CACHE_KEY_PREFIX, tenant_id, plugin_checksum)
+func (c *Persistence) getCacheKey(tenant_id string, plugin_checksum string, key string) string {
+	return fmt.Sprintf("%s:%s:%s:%s", CACHE_KEY_PREFIX, tenant_id, plugin_checksum, key)
 }
 
 func (c *Persistence) Save(tenant_id string, plugin_checksum string, key string, data []byte) error {
-	// add to cache
-	h := hex.EncodeToString(data)
-	return cache.SetMapOneField(c.getCacheKey(tenant_id, plugin_checksum), key, h)
+	if len(key) > 64 {
+		return fmt.Errorf("key length must be less than 64 characters")
+	}
+
+	return c.storage.Save(tenant_id, plugin_checksum, key, data)
 }
 
 func (c *Persistence) Load(tenant_id string, plugin_checksum string, key string) ([]byte, error) {
 	// check if the key exists in cache
-	h, err := cache.GetMapFieldString(c.getCacheKey(tenant_id, plugin_checksum), key)
+	h, err := cache.GetString(c.getCacheKey(tenant_id, plugin_checksum, key))
 	if err != nil && err != cache.ErrNotFound {
 		return nil, err
 	}
@@ -36,12 +39,20 @@ func (c *Persistence) Load(tenant_id string, plugin_checksum string, key string)
 	}
 
 	// load from storage
-	return c.storage.Load(tenant_id, plugin_checksum, key)
+	data, err := c.storage.Load(tenant_id, plugin_checksum, key)
+	if err != nil {
+		return nil, err
+	}
+
+	// add to cache
+	cache.Store(c.getCacheKey(tenant_id, plugin_checksum, key), hex.EncodeToString(data), time.Minute*5)
+
+	return data, nil
 }
 
 func (c *Persistence) Delete(tenant_id string, plugin_checksum string, key string) error {
 	// delete from cache and storage
-	err := cache.DelMapField(c.getCacheKey(tenant_id, plugin_checksum), key)
+	err := cache.Del(c.getCacheKey(tenant_id, plugin_checksum, key))
 	if err != nil {
 		return err
 	}

+ 112 - 0
internal/core/persistence/persistence_test.go

@@ -0,0 +1,112 @@
+package persistence
+
+import (
+	"encoding/hex"
+	"os"
+	"testing"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/strings"
+)
+
+func TestPersistenceStoreAndLoad(t *testing.T) {
+	err := cache.InitRedisClient("localhost:6379", "difyai123456")
+	if err != nil {
+		t.Fatalf("Failed to init redis client: %v", err)
+	}
+	defer cache.Close()
+
+	p := InitPersistence(&app.Config{
+		PersistenceStorageType:      "local",
+		PersistenceStorageLocalPath: "./persistence_storage",
+	})
+
+	key := strings.RandomString(10)
+
+	if err := p.Save("tenant_id", "plugin_checksum", key, []byte("data")); err != nil {
+		t.Fatalf("Failed to save data: %v", err)
+	}
+
+	data, err := p.Load("tenant_id", "plugin_checksum", key)
+	if err != nil {
+		t.Fatalf("Failed to load data: %v", err)
+	}
+
+	if string(data) != "data" {
+		t.Fatalf("Data mismatch: %s", data)
+	}
+
+	// check if the file exists
+	if _, err := os.Stat("./persistence_storage/tenant_id/plugin_checksum/" + key); os.IsNotExist(err) {
+		t.Fatalf("File not found: %v", err)
+	}
+
+	// check if cache is updated
+	cache_data, err := cache.GetString("persistence:cache:tenant_id:plugin_checksum:" + key)
+	if err != nil {
+		t.Fatalf("Failed to get cache data: %v", err)
+	}
+
+	cache_data_bytes, err := hex.DecodeString(cache_data)
+	if err != nil {
+		t.Fatalf("Failed to decode cache data: %v", err)
+	}
+
+	if string(cache_data_bytes) != "data" {
+		t.Fatalf("Cache data mismatch: %s", cache_data)
+	}
+}
+
+func TestPersistenceSaveAndLoadWithLongKey(t *testing.T) {
+	err := cache.InitRedisClient("localhost:6379", "difyai123456")
+	if err != nil {
+		t.Fatalf("Failed to init redis client: %v", err)
+	}
+	defer cache.Close()
+
+	p := InitPersistence(&app.Config{
+		PersistenceStorageType:      "local",
+		PersistenceStorageLocalPath: "./persistence_storage",
+	})
+
+	key := strings.RandomString(65)
+
+	if err := p.Save("tenant_id", "plugin_checksum", key, []byte("data")); err == nil {
+		t.Fatalf("Expected error, got nil")
+	}
+}
+
+func TestPersistenceDelete(t *testing.T) {
+	err := cache.InitRedisClient("localhost:6379", "difyai123456")
+	if err != nil {
+		t.Fatalf("Failed to init redis client: %v", err)
+	}
+	defer cache.Close()
+
+	p := InitPersistence(&app.Config{
+		PersistenceStorageType:      "local",
+		PersistenceStorageLocalPath: "./persistence_storage",
+	})
+
+	key := strings.RandomString(10)
+
+	if err := p.Save("tenant_id", "plugin_checksum", key, []byte("data")); err != nil {
+		t.Fatalf("Failed to save data: %v", err)
+	}
+
+	if err := p.Delete("tenant_id", "plugin_checksum", key); err != nil {
+		t.Fatalf("Failed to delete data: %v", err)
+	}
+
+	// check if the file exists
+	if _, err := os.Stat("./persistence_storage/tenant_id/plugin_checksum/" + key); !os.IsNotExist(err) {
+		t.Fatalf("File not deleted: %v", err)
+	}
+
+	// check if cache is updated
+	_, err = cache.GetString("persistence:cache:tenant_id:plugin_checksum:" + key)
+	if err != cache.ErrNotFound {
+		t.Fatalf("Cache data not deleted: %v", err)
+	}
+}

+ 11 - 0
internal/core/persistence/sync.go

@@ -1 +1,12 @@
 package persistence
+
+// Sync sync the cache to storage
+// NOTE: this method is not used currently, for now, we assume that
+// the effective is fast enough, but maybe someday we need to use the cache
+// func (p *Persistence) Sync() error {
+// 	// sync cache to storage
+// 	cache.ScanKeysAsync(fmt.Sprintf("%s:*", CACHE_KEY_PREFIX), func(keys []string) error {
+// 	})
+
+// 	return nil
+// }

+ 46 - 0
internal/utils/cache/redis.go

@@ -261,6 +261,52 @@ func GetMap[V any](key string, context ...redis.Cmdable) (map[string]V, error) {
 	return result, nil
 }
 
+// ScanKeys scan the keys with match pattern
+func ScanKeys(match string, context ...redis.Cmdable) ([]string, error) {
+	if client == nil {
+		return nil, ErrDBNotInit
+	}
+
+	result := make([]string, 0)
+
+	if err := ScanKeysAsync(match, func(keys []string) error {
+		result = append(result, keys...)
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	return result, nil
+}
+
+// ScanKeysAsync scan the keys with match pattern, format like "key*"
+func ScanKeysAsync(match string, fn func([]string) error, context ...redis.Cmdable) error {
+	if client == nil {
+		return ErrDBNotInit
+	}
+
+	cursor := uint64(0)
+
+	for {
+		keys, new_cursor, err := getCmdable(context...).Scan(ctx, cursor, match, 32).Result()
+		if err != nil {
+			return err
+		}
+
+		if err := fn(keys); err != nil {
+			return err
+		}
+
+		if new_cursor == 0 {
+			break
+		}
+
+		cursor = new_cursor
+	}
+
+	return nil
+}
+
 // ScanMap scan the map with match pattern, format like "key*"
 func ScanMap[V any](key string, match string, context ...redis.Cmdable) (map[string]V, error) {
 	if client == nil {