Browse Source

feat: register remote plugins

Yeuoly 11 months ago
parent
commit
4972f3011f

+ 4 - 0
internal/cluster/plugin_test.go

@@ -17,6 +17,10 @@ func (r *fakePlugin) InitEnvironment() error {
 	return nil
 }
 
+func (r *fakePlugin) Checksum() string {
+	return ""
+}
+
 func (r *fakePlugin) Identity() (string, error) {
 	return "", nil
 }

+ 0 - 1
internal/cluster/redirect_test.go

@@ -179,5 +179,4 @@ func TestRedirectTraffic(t *testing.T) {
 			t.Fatal("node 1 did not receive correct requests")
 		}
 	}
-
 }

+ 4 - 0
internal/core/plugin_daemon/backwards_invocation/task_test.go

@@ -16,6 +16,10 @@ func (r *TPluginRuntime) InitEnvironment() error {
 	return nil
 }
 
+func (r *TPluginRuntime) Checksum() string {
+	return ""
+}
+
 func (r *TPluginRuntime) Identity() (string, error) {
 	return "", nil
 }

+ 4 - 0
internal/core/plugin_manager/aws_manager/environment.go

@@ -3,3 +3,7 @@ package aws_manager
 func (r *AWSPluginRuntime) InitEnvironment() error {
 	return nil
 }
+
+func (r *AWSPluginRuntime) Checksum() string {
+	return ""
+}

+ 27 - 3
internal/core/plugin_manager/local_manager/environment.go

@@ -10,18 +10,20 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 func (r *LocalPluginRuntime) InitEnvironment() error {
-	if _, err := os.Stat(path.Join(r.State.RelativePath, ".installed")); err == nil {
+	if _, err := os.Stat(path.Join(r.State.AbsolutePath, ".installed")); err == nil {
 		return nil
 	}
 
 	// execute init command
 	handle := exec.Command("bash", r.Config.Execution.Install)
-	handle.Dir = r.State.RelativePath
+	handle.Dir = r.State.AbsolutePath
 	handle.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
 
 	// get stdout and stderr
@@ -115,7 +117,7 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 	}
 
 	// create .installed file
-	f, err := os.Create(path.Join(r.State.RelativePath, ".installed"))
+	f, err := os.Create(path.Join(r.State.AbsolutePath, ".installed"))
 	if err != nil {
 		return err
 	}
@@ -123,3 +125,25 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 
 	return nil
 }
+
+func (r *LocalPluginRuntime) calculateChecksum() string {
+	plugin_decoder, err := decoder.NewFSPluginDecoder(r.CWD)
+	if err != nil {
+		return ""
+	}
+
+	checksum, err := checksum.CalculateChecksum(plugin_decoder)
+	if err != nil {
+		return ""
+	}
+
+	return checksum
+}
+
+func (r *LocalPluginRuntime) Checksum() string {
+	if r.checksum == "" {
+		r.checksum = r.calculateChecksum()
+	}
+
+	return r.checksum
+}

+ 1 - 1
internal/core/plugin_manager/local_manager/run.go

@@ -38,7 +38,7 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	r.init()
 	// start plugin
 	e := exec.Command("bash", r.Config.Execution.Launch)
-	e.Dir = r.State.RelativePath
+	e.Dir = r.State.AbsolutePath
 	process.WrapProcess(e)
 
 	// get writer

+ 3 - 0
internal/core/plugin_manager/local_manager/type.go

@@ -4,7 +4,10 @@ import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 
 type LocalPluginRuntime struct {
 	entities.PluginRuntime
+	CWD string
 
 	io_identity string
 	w           chan bool
+
+	checksum string
 }

internal/core/plugin_manager/remote_manager/cs.go → internal/core/plugin_manager/remote_manager/checksum.go


+ 11 - 1
internal/core/plugin_manager/remote_manager/hooks.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/panjf2000/gnet/v2"
@@ -159,9 +160,18 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		// registration transferred
 		runtime.registration_transferred = true
 
-		runtime.InitState(runtime.calculateChecksum())
+		runtime.checksum = runtime.calculateChecksum()
+		runtime.InitState()
 		runtime.SetActiveAt(time.Now())
 
+		// trigger registration event
+		if err := runtime.Register(); err != nil {
+			runtime.conn.Write([]byte("register failed\n"))
+			log.Error("register failed", "error", err)
+			runtime.conn.Close()
+			return
+		}
+
 		// publish runtime to watcher
 		s.response.Write(runtime)
 	} else {

+ 7 - 0
internal/core/plugin_manager/remote_manager/register.go

@@ -0,0 +1,7 @@
+package remote_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
+
+func (plugin *RemotePluginRuntime) Register() error {
+	return install_service.InstallPlugin(plugin.tenant_id, "", plugin, map[string]any{})
+}

+ 4 - 0
internal/core/plugin_manager/remote_manager/run.go

@@ -99,3 +99,7 @@ func (r *RemotePluginRuntime) StartPlugin() error {
 func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
 	return r.shutdown_chan, nil
 }
+
+func (r *RemotePluginRuntime) Checksum() string {
+	return r.checksum
+}

+ 2 - 0
internal/core/plugin_manager/remote_manager/type.go

@@ -40,6 +40,8 @@ type RemotePluginRuntime struct {
 	tenant_id string
 
 	alive bool
+
+	checksum string
 }
 
 func (r *RemotePluginRuntime) Identity() (string, error) {

+ 2 - 1
internal/core/plugin_manager/watcher.go

@@ -55,6 +55,7 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 		} else if config.Platform == app.PLATFORM_LOCAL {
 			plugin_interface = &local_manager.LocalPluginRuntime{
 				PluginRuntime: plugin,
+				CWD:           plugin.State.AbsolutePath,
 			}
 		} else {
 			log.Error("unsupported platform: %s for plugin: %s", config.Platform, plugin.Config.Name)
@@ -102,7 +103,7 @@ func loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
 					State: entities.PluginRuntimeState{
 						Status:       entities.PLUGIN_RUNTIME_STATUS_PENDING,
 						Restarts:     0,
-						RelativePath: path.Join(root_path, plugin.Name()),
+						AbsolutePath: path.Join(root_path, plugin.Name()),
 						ActiveAt:     nil,
 						Verified:     err == nil,
 					},

+ 30 - 0
internal/core/plugin_packager/checksum/checksum.go

@@ -0,0 +1,30 @@
+package checksum
+
+import (
+	"crypto/sha256"
+	"encoding/hex"
+	"path"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+)
+
+func CalculateChecksum(plugin decoder.PluginDecoder) (string, error) {
+	m := map[string]any{}
+
+	if err := plugin.Walk(func(filename string, dir string) error {
+		var err error
+		m[path.Join(dir, filename)], err = plugin.ReadFile(path.Join(dir, filename))
+		if err != nil {
+			return err
+		}
+		return nil
+	}); err != nil {
+		return "", err
+	}
+
+	str := parser.MarshalJsonBytes(m)
+	sha := sha256.New()
+	sha.Write(str)
+	return hex.EncodeToString(sha.Sum(nil)), nil
+}

+ 1 - 0
internal/service/install_service/cons.go

@@ -0,0 +1 @@
+package install_service

+ 1 - 0
internal/service/install_service/prod.go

@@ -0,0 +1 @@
+package install_service

+ 1 - 1
internal/service/install_task/state.go

@@ -1,4 +1,4 @@
-package install_task
+package install_service
 
 import (
 	"time"

+ 0 - 1
internal/service/install_task/cons.go

@@ -1 +0,0 @@
-package install_task

+ 0 - 1
internal/service/install_task/prod.go

@@ -1 +0,0 @@
-package install_task

+ 2 - 8
internal/types/entities/runtime.go

@@ -109,7 +109,7 @@ func (r *PluginRuntime) UpdateScheduledAt(t time.Time) {
 	r.State.ScheduledAt = &t
 }
 
-func (r *PluginRuntime) InitState(checksum string) {
+func (r *PluginRuntime) InitState() {
 	r.State = PluginRuntimeState{
 		Restarts:    0,
 		Status:      PLUGIN_RUNTIME_STATUS_PENDING,
@@ -118,7 +118,6 @@ func (r *PluginRuntime) InitState(checksum string) {
 		Verified:    false,
 		ScheduledAt: nil,
 		Logs:        []string{},
-		Checksum:    checksum,
 	}
 }
 
@@ -150,10 +149,6 @@ func (r *PluginRuntime) AddRestarts() {
 	r.State.Restarts++
 }
 
-func (r *PluginRuntime) Checksum() string {
-	return r.State.Checksum
-}
-
 func (r *PluginRuntime) OnStop(f func()) {
 	r.onStopped = append(r.onStopped, f)
 }
@@ -175,13 +170,12 @@ const (
 type PluginRuntimeState struct {
 	Restarts     int        `json:"restarts"`
 	Status       string     `json:"status"`
-	RelativePath string     `json:"relative_path"`
+	AbsolutePath string     `json:"absolute_path"`
 	ActiveAt     *time.Time `json:"active_at"`
 	StoppedAt    *time.Time `json:"stopped_at"`
 	Verified     bool       `json:"verified"`
 	ScheduledAt  *time.Time `json:"scheduled_at"`
 	Logs         []string   `json:"logs"`
-	Checksum     string     `json:"checksum"`
 }
 
 func (s *PluginRuntimeState) Hash() (uint64, error) {

+ 1 - 1
internal/types/entities/runtime_test.go

@@ -9,7 +9,7 @@ func TestRuntimeStateHash(t *testing.T) {
 	state := PluginRuntimeState{
 		Restarts:     0,
 		Status:       PLUGIN_RUNTIME_STATUS_PENDING,
-		RelativePath: "aaa",
+		AbsolutePath: "aaa",
 		ActiveAt:     &[]time.Time{time.Now()}[0],
 		StoppedAt:    &[]time.Time{time.Now()}[0],
 		Verified:     true,