Parcourir la source

feat: plugin assets

Yeuoly il y a 10 mois
Parent
commit
d02ef5effd

+ 3 - 1
cmd/server/main.go

@@ -43,10 +43,12 @@ func setDefault(config *app.Config) {
 	setDefaultInt(&config.MaxPluginPackageSize, 52428800)
 	setDefaultInt(&config.MaxAWSLambdaTransactionTimeout, 150)
 	setDefaultInt(&config.PluginMaxExecutionTimeout, 240)
+	setDefaultInt(&config.PluginMediaCacheSize, 1024)
 	setDefaultBool(&config.PluginRemoteInstallingEnabled, true)
 	setDefaultBool(&config.PluginEndpointEnabled, true)
 	setDefaultString(&config.DBSslMode, "disable")
-	setDefaultString(&config.ProcessCachingPath, "/tmp/dify-plugin-daemon-subprocesses")
+	setDefaultString(&config.PluginMediaCachePath, "/var/dify-plugin-daemon/media-cache")
+	setDefaultString(&config.ProcessCachingPath, "/var/dify-plugin-daemon/subprocesses")
 }
 
 func setDefaultInt[T constraints.Integer](value *T, defaultValue T) {

+ 2 - 0
go.mod

@@ -41,6 +41,7 @@ require (
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 	github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
+	github.com/hashicorp/golang-lru v1.0.2 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/inconshreveable/mousetrap v1.1.0 // indirect
 	github.com/jackc/pgpassfile v1.0.0 // indirect
@@ -85,6 +86,7 @@ require (
 	github.com/go-playground/universal-translator v0.18.1
 	github.com/go-playground/validator/v10 v10.22.0
 	github.com/goccy/go-json v0.10.3 // indirect
+	github.com/hashicorp/golang-lru/v2 v2.0.7
 	github.com/joho/godotenv v1.5.1
 	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/kelseyhightower/envconfig v1.4.0

+ 4 - 0
go.sum

@@ -96,6 +96,10 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
+github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
+github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
 github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=

+ 12 - 0
internal/core/plugin_manager/basic_manager/remap_assets.go

@@ -0,0 +1,12 @@
+package basic_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+
+// RemapAssets will take the assets and remap them to a media id
+func (r *BasicPluginRuntime) RemapAssets(
+	declaration *plugin_entities.PluginDeclaration,
+	assets map[string][]byte,
+) error {
+	// TODO: implement
+	return nil
+}

+ 11 - 0
internal/core/plugin_manager/basic_manager/type.go

@@ -0,0 +1,11 @@
+package basic_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
+
+type BasicPluginRuntime struct {
+	mediaManager *media_manager.MediaManager
+}
+
+func NewBasicPluginRuntime(mediaManager *media_manager.MediaManager) BasicPluginRuntime {
+	return BasicPluginRuntime{mediaManager: mediaManager}
+}

+ 9 - 1
internal/core/plugin_manager/manager.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
@@ -22,6 +23,9 @@ type PluginManager struct {
 	maxPluginPackageSize int64
 	workingDirectory     string
 
+	// mediaManager is used to manage media files like plugin icons, images, etc.
+	mediaManager *media_manager.MediaManager
+
 	// running plugin in storage contains relations between plugin packages and their running instances
 	runningPluginInStorage mapping.Map[string, string]
 	// start process lock
@@ -37,7 +41,11 @@ func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config
 		cluster:              cluster,
 		maxPluginPackageSize: configuration.MaxPluginPackageSize,
 		workingDirectory:     configuration.PluginWorkingPath,
-		startProcessLock:     lock.NewHighGranularityLock(),
+		mediaManager: media_manager.NewMediaManager(
+			configuration.PluginMediaCachePath,
+			configuration.PluginMediaCacheSize,
+		),
+		startProcessLock: lock.NewHighGranularityLock(),
 	}
 	manager.Init(configuration)
 }

+ 71 - 0
internal/core/plugin_manager/media_manager/type.go

@@ -0,0 +1,71 @@
+package media_manager
+
+import (
+	"crypto/sha256"
+	"encoding/hex"
+	"os"
+	"path"
+
+	lru "github.com/hashicorp/golang-lru/v2"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/strings"
+)
+
+type MediaManager struct {
+	storagePath string
+	cache       *lru.Cache[string, []byte]
+}
+
+func NewMediaManager(storage_path string, cache_size uint16) *MediaManager {
+	// mkdir -p storage_path
+	if err := os.MkdirAll(storage_path, 0o755); err != nil {
+		log.Error("Failed to create storage path: %s", err)
+	}
+
+	// lru.New only raises error when cache_size is a negative number, which is impossible
+	cache, _ := lru.New[string, []byte](int(cache_size))
+
+	return &MediaManager{storagePath: storage_path, cache: cache}
+}
+
+// Upload uploads a file to the media manager and returns an identifier
+func (m *MediaManager) Upload(file []byte) (string, error) {
+	// calculate checksum
+	checksum := sha256.Sum256(append(file, []byte(strings.RandomString(10))...))
+
+	id := hex.EncodeToString(checksum[:])
+
+	// store locally
+	filePath := path.Join(m.storagePath, id)
+	err := os.WriteFile(filePath, file, 0o644)
+	if err != nil {
+		return "", err
+	}
+
+	return id, nil
+}
+
+func (m *MediaManager) Get(id string) ([]byte, error) {
+	// check if id is in cache
+	data, ok := m.cache.Get(id)
+	if ok {
+		return data, nil
+	}
+
+	// check if id is in storage
+	filePath := path.Join(m.storagePath, id)
+	if _, err := os.Stat(filePath); os.IsNotExist(err) {
+		return nil, err
+	}
+
+	// read file
+	file, err := os.ReadFile(filePath)
+	if err != nil {
+		return nil, err
+	}
+
+	// store in cache
+	m.cache.Add(id, file)
+
+	return file, nil
+}

+ 6 - 1
internal/core/plugin_manager/positive_manager/types.go

@@ -1,8 +1,13 @@
 package positive_manager
 
-import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+)
 
 type PositivePluginRuntime struct {
+	basic_manager.BasicPluginRuntime
+
 	LocalPackagePath string
 	WorkingPath      string
 	// plugin decoder used to manage the plugin

+ 35 - 0
internal/core/plugin_manager/remote_manager/hooks.go

@@ -1,9 +1,11 @@
 package remote_manager
 
 import (
+	"encoding/hex"
 	"sync"
 	"time"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -17,6 +19,8 @@ type DifyServer struct {
 
 	engine gnet.Engine
 
+	mediaManager *media_manager.MediaManager
+
 	// listening address
 	addr string
 	port uint16
@@ -221,6 +225,33 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 			declaration.Endpoint = &endpoints[0]
 			runtime.Config = declaration
 		}
+	} else if !runtime.assets_transferred {
+		assets, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.RemoteAssetPayload](message)
+		if err != nil {
+			runtime.conn.Write([]byte("assets register failed\n"))
+			log.Error("assets register failed, error: %v", err)
+			runtime.conn.Close()
+			return
+		}
+
+		files := make(map[string][]byte)
+		for _, asset := range assets {
+			files[asset.Filename], err = hex.DecodeString(asset.Data)
+			if err != nil {
+				runtime.conn.Write([]byte("assets decode failed\n"))
+				log.Error("assets decode failed, error: %v", err)
+				runtime.conn.Close()
+				return
+			}
+		}
+
+		// remap assets
+		if err := runtime.RemapAssets(&runtime.Config, files); err != nil {
+			runtime.conn.Write([]byte("assets remap failed\n"))
+			log.Error("assets remap failed, error: %v", err)
+			runtime.conn.Close()
+			return
+		}
 
 		runtime.checksum = runtime.calculateChecksum()
 		runtime.InitState()
@@ -241,3 +272,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		runtime.response.Write(message)
 	}
 }
+
+func (s *DifyServer) onAssets(runtime *RemotePluginRuntime, assets []plugin_entities.RemoteAssetPayload) {
+
+}

+ 8 - 6
internal/core/plugin_manager/remote_manager/server.go

@@ -8,6 +8,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/panjf2000/gnet/v2"
@@ -77,7 +78,7 @@ func (r *RemotePluginServer) Launch() error {
 }
 
 // NewRemotePluginServer creates a new RemotePluginServer
-func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
+func NewRemotePluginServer(config *app.Config, media_manager *media_manager.MediaManager) *RemotePluginServer {
 	addr := fmt.Sprintf(
 		"tcp://%s:%d",
 		config.PluginRemoteInstallingHost,
@@ -90,11 +91,12 @@ func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
 
 	multicore := true
 	s := &DifyServer{
-		addr:      addr,
-		port:      config.PluginRemoteInstallingPort,
-		multicore: multicore,
-		num_loops: config.PluginRemoteInstallServerEventLoopNums,
-		response:  response,
+		mediaManager: media_manager,
+		addr:         addr,
+		port:         config.PluginRemoteInstallingPort,
+		multicore:    multicore,
+		num_loops:    config.PluginRemoteInstallServerEventLoopNums,
+		response:     response,
 
 		plugins:      make(map[int]*RemotePluginRuntime),
 		plugins_lock: &sync.RWMutex{},

+ 1 - 1
internal/core/plugin_manager/remote_manager/server_test.go

@@ -39,7 +39,7 @@ func preparePluginServer(t *testing.T) (*RemotePluginServer, uint16) {
 		PluginRemoteInstallingPort:             port,
 		PluginRemoteInstallingMaxConn:          1,
 		PluginRemoteInstallServerEventLoopNums: 8,
-	}), port
+	}, nil), port
 }
 
 // TestLaunchAndClosePluginServer tests the launch and close of the plugin server

+ 3 - 0
internal/core/plugin_manager/remote_manager/type.go

@@ -4,12 +4,14 @@ import (
 	"sync"
 	"time"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/panjf2000/gnet/v2"
 )
 
 type RemotePluginRuntime struct {
+	basic_manager.BasicPluginRuntime
 	plugin_entities.PluginRuntime
 
 	// connection
@@ -38,6 +40,7 @@ type RemotePluginRuntime struct {
 	tools_registration_transferred     bool
 	models_registration_transferred    bool
 	endpoints_registration_transferred bool
+	assets_transferred                 bool
 
 	// tenant id
 	tenant_id string

+ 10 - 7
internal/core/plugin_manager/watcher.go

@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
@@ -34,7 +35,7 @@ func (p *PluginManager) startLocalWatcher(config *app.Config) {
 func (p *PluginManager) startRemoteWatcher(config *app.Config) {
 	// launch TCP debugging server if enabled
 	if config.PluginRemoteInstallingEnabled {
-		server := remote_manager.NewRemotePluginServer(config)
+		server := remote_manager.NewRemotePluginServer(config, p.mediaManager)
 		go func() {
 			err := server.Launch()
 			if err != nil {
@@ -58,18 +59,20 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 			plugin_interface = &aws_manager.AWSPluginRuntime{
 				PluginRuntime: plugin.Runtime,
 				PositivePluginRuntime: positive_manager.PositivePluginRuntime{
-					LocalPackagePath: plugin.Runtime.State.AbsolutePath,
-					WorkingPath:      plugin.Runtime.State.WorkingPath,
-					Decoder:          plugin.Decoder,
+					BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
+					LocalPackagePath:   plugin.Runtime.State.AbsolutePath,
+					WorkingPath:        plugin.Runtime.State.WorkingPath,
+					Decoder:            plugin.Decoder,
 				},
 			}
 		} else if config.Platform == app.PLATFORM_LOCAL {
 			plugin_interface = &local_manager.LocalPluginRuntime{
 				PluginRuntime: plugin.Runtime,
 				PositivePluginRuntime: positive_manager.PositivePluginRuntime{
-					LocalPackagePath: plugin.Runtime.State.AbsolutePath,
-					WorkingPath:      plugin.Runtime.State.WorkingPath,
-					Decoder:          plugin.Decoder,
+					BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
+					LocalPackagePath:   plugin.Runtime.State.AbsolutePath,
+					WorkingPath:        plugin.Runtime.State.WorkingPath,
+					Decoder:            plugin.Decoder,
 				},
 			}
 		} else {

+ 5 - 3
internal/types/app/config.go

@@ -20,9 +20,11 @@ type Config struct {
 
 	PluginEndpointEnabled bool `envconfig:"PLUGIN_ENDPOINT_ENABLED"`
 
-	PluginStoragePath  string `envconfig:"STORAGE_PLUGIN_PATH" validate:"required"`
-	PluginWorkingPath  string `envconfig:"PLUGIN_WORKING_PATH"`
-	ProcessCachingPath string `envconfig:"PROCESS_CACHING_PATH"`
+	PluginStoragePath    string `envconfig:"STORAGE_PLUGIN_PATH" validate:"required"`
+	PluginWorkingPath    string `envconfig:"PLUGIN_WORKING_PATH"`
+	PluginMediaCacheSize uint16 `envconfig:"PLUGIN_MEDIA_CACHE_SIZE"`
+	PluginMediaCachePath string `envconfig:"PLUGIN_MEDIA_CACHE_PATH"`
+	ProcessCachingPath   string `envconfig:"PROCESS_CACHING_PATH"`
 
 	PluginMaxExecutionTimeout int `envconfig:"PLUGIN_MAX_EXECUTION_TIMEOUT" validate:"required"`
 

+ 6 - 0
internal/types/entities/plugin_entities/remote_entities.go

@@ -0,0 +1,6 @@
+package plugin_entities
+
+type RemoteAssetPayload struct {
+	Filename string `json:"filename" validate:"required"`
+	Data     string `json:"data" validate:"required"`
+}