Yeuoly 1 year ago
parent
commit
1ea1134488

+ 1 - 1
cmd/server/main.go

@@ -28,7 +28,7 @@ func main() {
 		log.Panic("Invalid configuration: %s", err.Error())
 	}
 
-	server.Run(&config)
+	(&server.App{}).Run(&config)
 }
 
 func setDefault(config *app.Config) {

+ 14 - 14
internal/cluster/init.go

@@ -1,25 +1,25 @@
 package cluster
 
-import "github.com/langgenius/dify-plugin-daemon/internal/types/app"
+import (
+	"sync"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+)
 
 type Cluster struct {
 	port uint16
-}
 
-var (
-	cluster *Cluster
-)
+	plugins     map[string]*PluginLifeTime
+	plugin_lock sync.Mutex
+}
 
-func Launch(config *app.Config) {
-	cluster = &Cluster{
-		port: uint16(config.ServerPort),
+func NewCluster(config *app.Config) *Cluster {
+	return &Cluster{
+		port:    uint16(config.ServerPort),
+		plugins: make(map[string]*PluginLifeTime),
 	}
-
-	go func() {
-		cluster.clusterLifetime()
-	}()
 }
 
-func GetCluster() *Cluster {
-	return cluster
+func (c *Cluster) Launch(config *app.Config) {
+	go c.clusterLifetime()
 }

+ 39 - 0
internal/cluster/state.go

@@ -1 +1,40 @@
 package cluster
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+)
+
+type PluginLifeTime struct {
+	lifetime entities.PluginRuntimeTimeLifeInterface
+}
+
+// RegisterPlugin registers a plugin to the cluster, and start to be scheduled
+func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
+	identity, err := lifetime.Identity()
+	if err != nil {
+		return err
+	}
+
+	lifetime.OnStop(func() {
+		c.plugin_lock.Lock()
+		delete(c.plugins, identity)
+		c.plugin_lock.Unlock()
+	})
+
+	c.plugin_lock.Lock()
+	if !lifetime.Stopped() {
+		c.plugins[identity] = &PluginLifeTime{
+			lifetime: lifetime,
+		}
+	}
+	c.plugin_lock.Unlock()
+
+	log.Info("start to schedule plugin %s", identity)
+
+	return nil
+}
+
+func (c *Cluster) SchedulePlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
+	return nil
+}

+ 2 - 4
internal/core/plugin_daemon/model_service.go

@@ -20,10 +20,8 @@ func genericInvokePlugin[Req any, Rsp any](
 	response_buffer_size int,
 	typ backwards_invocation.PluginAccessType,
 	action backwards_invocation.PluginAccessAction,
-) (
-	*stream.StreamResponse[Rsp], error,
-) {
-	runtime := plugin_manager.Get(session.PluginIdentity())
+) (*stream.StreamResponse[Rsp], error) {
+	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
 	if runtime == nil {
 		return nil, errors.New("plugin not found")
 	}

+ 3 - 1
internal/core/plugin_manager/init.go

@@ -6,7 +6,9 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 )
 
-var m sync.Map
+var (
+	m sync.Map
+)
 
 func checkPluginExist(identity string) (entities.PluginRuntimeInterface, bool) {
 	if v, ok := m.Load(identity); ok {

+ 8 - 8
internal/core/plugin_manager/lifetime.go

@@ -8,22 +8,22 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
-func lifetime(config *app.Config, r entities.PluginRuntimeInterface) {
+func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInterface) {
 	start_failed_times := 0
 	configuration := r.Configuration()
 
 	log.Info("new plugin logged in: %s", configuration.Identity())
 	defer log.Info("plugin %s has exited", configuration.Identity())
 
-	// store plugin runtime
-	m.Store(configuration.Identity(), r)
-	defer m.Delete(configuration.Identity())
-
-	// update lifetime state for this pod
-	addLifetimeState(r)
+	// add plugin to cluster
+	err := p.cluster.RegisterPlugin(r)
+	if err != nil {
+		log.Error("add plugin to cluster failed: %s", err.Error())
+		return
+	}
 
 	// remove lifetime state after plugin if it has been stopped
-	defer deleteLifetimeState(r)
+	defer r.TriggerStop()
 
 	for !r.Stopped() {
 		if err := r.InitEnvironment(); err != nil {

+ 0 - 146
internal/core/plugin_manager/lifetime_manager.go

@@ -1,146 +0,0 @@
-package plugin_manager
-
-import (
-	"sync"
-	"time"
-
-	"github.com/google/uuid"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
-)
-
-const (
-	KEY_PLUGIN_LIFETIME_STATE             = "lifetime_state"
-	KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK = "lifetime_state_modify_lock"
-)
-
-type PluginLifeTime struct {
-	Identity string                            `json:"identity"`
-	Restarts int                               `json:"restarts"`
-	Status   string                            `json:"status"`
-	Config   plugin_entities.PluginDeclaration `json:"configuration"`
-}
-
-type pluginLifeCollection struct {
-	Collection  map[string]PluginLifeTime `json:"state"`
-	ID          string                    `json:"id"`
-	LastCheckAt time.Time                 `json:"last_check_at"`
-}
-
-func (p pluginLifeCollection) MarshalBinary() ([]byte, error) {
-	return parser.MarshalJsonBytes(p), nil
-}
-
-var (
-	instanceId = uuid.New().String()
-
-	pluginLifetimeStateLock  = sync.RWMutex{}
-	pluginLifetimeCollection = pluginLifeCollection{
-		Collection: map[string]PluginLifeTime{},
-		ID:         instanceId,
-	}
-)
-
-func startLifeTimeManager(config *app.Config) {
-	go func() {
-		// do check immediately
-		doClusterLifetimeCheck(config.LifetimeCollectionGCInterval)
-
-		duration := time.Duration(config.LifetimeCollectionHeartbeatInterval) * time.Second
-		for range time.NewTicker(duration).C {
-			doClusterLifetimeCheck(config.LifetimeCollectionGCInterval)
-		}
-	}()
-}
-
-func doClusterLifetimeCheck(heartbeat_interval int) {
-	// check and update self lifetime state
-	if err := updateCurrentInstanceLifetimeCollection(); err != nil {
-		log.Error("update current instance lifetime state failed: %s", err.Error())
-		return
-	}
-
-	// lock cluster and do cluster lifetime check
-	if cache.Lock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK, time.Second*10, time.Second*10) != nil {
-		log.Error("update lifetime state failed: lock failed")
-		return
-	}
-	defer cache.Unlock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK)
-
-	cluster_lifetime_collections, err := fetchClusterPluginLifetimeCollections()
-	if err != nil {
-		log.Error("fetch cluster plugin lifetime state failed: %s", err.Error())
-		return
-	}
-
-	for cluster_id, state := range cluster_lifetime_collections {
-		if state.ID == instanceId {
-			continue
-		}
-
-		// skip if last check has been done in $LIFETIME_COLLECTION_CG_INTERVAL
-		cg_interval := time.Duration(heartbeat_interval) * time.Second
-		if time.Since(state.LastCheckAt) < cg_interval {
-			continue
-		}
-
-		// if last check has not been done in $LIFETIME_COLLECTION_CG_INTERVAL * 2, delete it
-		if time.Since(state.LastCheckAt) > cg_interval*2 {
-			if err := cache.DelMapField(KEY_PLUGIN_LIFETIME_STATE, cluster_id); err != nil {
-				log.Error("delete cluster plugin lifetime state failed: %s", err.Error())
-			} else {
-				log.Info("delete cluster plugin lifetime state due to no longer active: %s", cluster_id)
-			}
-		}
-	}
-}
-
-func newLifetimeFromRuntimeState(state entities.PluginRuntimeInterface) PluginLifeTime {
-	s := state.RuntimeState()
-	c := state.Configuration()
-
-	return PluginLifeTime{
-		Identity: c.Identity(),
-		Restarts: s.Restarts,
-		Status:   s.Status,
-		Config:   *c,
-	}
-}
-
-func addLifetimeState(state entities.PluginRuntimeInterface) {
-	pluginLifetimeStateLock.Lock()
-	defer pluginLifetimeStateLock.Unlock()
-
-	pluginLifetimeCollection.Collection[state.Configuration().Identity()] = newLifetimeFromRuntimeState(state)
-}
-
-func deleteLifetimeState(state entities.PluginRuntimeInterface) {
-	pluginLifetimeStateLock.Lock()
-	defer pluginLifetimeStateLock.Unlock()
-
-	delete(pluginLifetimeCollection.Collection, state.Configuration().Identity())
-}
-
-func updateCurrentInstanceLifetimeCollection() error {
-	pluginLifetimeStateLock.Lock()
-	defer pluginLifetimeStateLock.Unlock()
-
-	pluginLifetimeCollection.LastCheckAt = time.Now()
-
-	m.Range(func(key, value interface{}) bool {
-		if v, ok := value.(entities.PluginRuntimeInterface); ok {
-			pluginLifetimeCollection.Collection[v.Configuration().Identity()] = newLifetimeFromRuntimeState(v)
-		}
-		return true
-	})
-
-	return cache.SetMapOneField(KEY_PLUGIN_LIFETIME_STATE, instanceId, pluginLifetimeCollection)
-}
-
-func fetchClusterPluginLifetimeCollections() (map[string]pluginLifeCollection, error) {
-	return cache.GetMap[pluginLifeCollection](KEY_PLUGIN_LIFETIME_STATE)
-}

+ 25 - 9
internal/core/plugin_manager/manager.go

@@ -3,6 +3,7 @@ package plugin_manager
 import (
 	"fmt"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
@@ -10,7 +11,25 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
-func List() []entities.PluginRuntimeInterface {
+type PluginManager struct {
+	cluster *cluster.Cluster
+}
+
+var (
+	manager *PluginManager
+)
+
+func InitGlobalPluginManager(cluster *cluster.Cluster) {
+	manager = &PluginManager{
+		cluster: cluster,
+	}
+}
+
+func GetGlobalPluginManager() *PluginManager {
+	return manager
+}
+
+func (p *PluginManager) List() []entities.PluginRuntimeInterface {
 	var runtimes []entities.PluginRuntimeInterface
 	m.Range(func(key, value interface{}) bool {
 		if v, ok := value.(entities.PluginRuntimeInterface); ok {
@@ -21,7 +40,7 @@ func List() []entities.PluginRuntimeInterface {
 	return runtimes
 }
 
-func Get(identity string) entities.PluginRuntimeInterface {
+func (p *PluginManager) Get(identity string) entities.PluginRuntimeInterface {
 	if v, ok := m.Load(identity); ok {
 		if r, ok := v.(entities.PluginRuntimeInterface); ok {
 			return r
@@ -30,15 +49,15 @@ func Get(identity string) entities.PluginRuntimeInterface {
 	return nil
 }
 
-func Put(path string, binary []byte) {
+func (p *PluginManager) Put(path string, binary []byte) {
 	//TODO: put binary into
 }
 
-func Delete(identity string) {
+func (p *PluginManager) Delete(identity string) {
 	//TODO: delete binary from
 }
 
-func Init(configuration *app.Config) {
+func (p *PluginManager) Init(configuration *app.Config) {
 	// TODO: init plugin manager
 	log.Info("start plugin manager daemon...")
 
@@ -57,8 +76,5 @@ func Init(configuration *app.Config) {
 	}
 
 	// start plugin watcher
-	startWatcher(configuration)
-
-	// start plugin lifetime manager
-	startLifeTimeManager(configuration)
+	p.startWatcher(configuration)
 }

+ 8 - 8
internal/core/plugin_manager/watcher.go

@@ -15,19 +15,19 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
-func startWatcher(config *app.Config) {
+func (p *PluginManager) startWatcher(config *app.Config) {
 	go func() {
 		log.Info("start to handle new plugins in path: %s", config.StoragePath)
-		handleNewPlugins(config)
+		p.handleNewPlugins(config)
 		for range time.NewTicker(time.Second * 30).C {
-			handleNewPlugins(config)
+			p.handleNewPlugins(config)
 		}
 	}()
 
-	startRemoteWatcher(config)
+	p.startRemoteWatcher(config)
 }
 
-func startRemoteWatcher(config *app.Config) {
+func (p *PluginManager) startRemoteWatcher(config *app.Config) {
 	// launch TCP debugging server if enabled
 	if config.PluginRemoteInstallingEnabled {
 		server := remote_manager.NewRemotePluginServer(config)
@@ -39,13 +39,13 @@ func startRemoteWatcher(config *app.Config) {
 		}()
 		go func() {
 			server.Wrap(func(rpr *remote_manager.RemotePluginRuntime) {
-				lifetime(config, rpr)
+				p.lifetime(config, rpr)
 			})
 		}()
 	}
 }
 
-func handleNewPlugins(config *app.Config) {
+func (p *PluginManager) handleNewPlugins(config *app.Config) {
 	// load local plugins firstly
 	for plugin := range loadNewPlugins(config.StoragePath) {
 		var plugin_interface entities.PluginRuntimeInterface
@@ -64,7 +64,7 @@ func handleNewPlugins(config *app.Config) {
 		}
 
 		routine.Submit(func() {
-			lifetime(config, plugin_interface)
+			p.lifetime(config, plugin_interface)
 		})
 	}
 }

+ 11 - 0
internal/server/app.go

@@ -0,0 +1,11 @@
+package server
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
+)
+
+type App struct {
+	plugin_manager *plugin_manager.PluginManager
+	cluster        *cluster.Cluster
+}

+ 7 - 4
internal/server/server.go

@@ -9,7 +9,10 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
-func Run(config *app.Config) {
+func (a *App) Run(config *app.Config) {
+	a.cluster = cluster.NewCluster(config)
+	plugin_manager.InitGlobalPluginManager(a.cluster)
+
 	// init routine pool
 	routine.InitPool(config.RoutinePoolSize)
 
@@ -20,10 +23,10 @@ func Run(config *app.Config) {
 	process.Init(config)
 
 	// init plugin daemon
-	plugin_manager.Init(config)
+	a.plugin_manager.Init(config)
 
-	// init cluster
-	cluster.Launch(config)
+	// launch cluster
+	a.cluster.Launch(config)
 
 	// start http server
 	server(config)

+ 1 - 1
internal/service/invoke_tool.go

@@ -14,7 +14,7 @@ import (
 
 func createSession[T any](r *plugin_entities.InvokePluginRequest[T]) *session_manager.Session {
 	session := session_manager.NewSession(r.TenantId, r.UserId, parser.MarshalPluginIdentity(r.PluginName, r.PluginVersion))
-	runtime := plugin_manager.Get(session.PluginIdentity())
+	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
 	session.BindRuntime(runtime)
 	return session
 }

+ 13 - 7
internal/types/entities/runtime.go

@@ -10,7 +10,7 @@ type (
 	PluginRuntime struct {
 		State     PluginRuntimeState                `json:"state"`
 		Config    plugin_entities.PluginDeclaration `json:"config"`
-		Connector PluginConnector                   `json:"-"`
+		onStopped []func()                          `json:"-"`
 	}
 
 	PluginRuntimeInterface interface {
@@ -25,6 +25,8 @@ type (
 		StartPlugin() error
 		Stopped() bool
 		Stop()
+		OnStop(func())
+		TriggerStop()
 		RuntimeState() *PluginRuntimeState
 		Wait() (<-chan bool, error)
 		Type() PluginRuntimeType
@@ -56,6 +58,16 @@ func (r *PluginRuntime) RuntimeState() *PluginRuntimeState {
 	return &r.State
 }
 
+func (r *PluginRuntime) OnStop(f func()) {
+	r.onStopped = append(r.onStopped, f)
+}
+
+func (r *PluginRuntime) TriggerStop() {
+	for _, f := range r.onStopped {
+		f()
+	}
+}
+
 type PluginRuntimeType string
 
 const (
@@ -80,9 +92,3 @@ const (
 	PLUGIN_RUNTIME_STATUS_RESTARTING = "restarting"
 	PLUGIN_RUNTIME_STATUS_PENDING    = "pending"
 )
-
-type PluginConnector interface {
-	OnMessage(func([]byte))
-	Read([]byte) int
-	Write([]byte) int
-}