Explorar el Código

feat: schedule plugins

Yeuoly hace 1 año
padre
commit
2b3529296d

+ 6 - 0
internal/cluster/entities.go

@@ -1,5 +1,7 @@
 package cluster
 
+import "time"
+
 type ip struct {
 	Address string `json:"address"`
 	Votes   []vote `json:"vote"`
@@ -15,3 +17,7 @@ type node struct {
 	Ips        []ip  `json:"ips"`
 	LastPingAt int64 `json:"last_ping_at"`
 }
+
+func (c *node) available() bool {
+	return time.Since(time.Unix(c.LastPingAt, 0)) < NODE_DISCONNECTED_TIMEOUT
+}

+ 0 - 51
internal/cluster/gc.go

@@ -1,51 +0,0 @@
-package cluster
-
-import (
-	"errors"
-	"time"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
-)
-
-// gc the nodes has already deactivated
-func (c *Cluster) gcNodes() error {
-	var total_errors error
-	add_error := func(err error) {
-		if err != nil {
-			if total_errors == nil {
-				total_errors = err
-			} else {
-				total_errors = errors.Join(total_errors, err)
-			}
-		}
-	}
-
-	// get all nodes status
-	nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
-	if err == cache.ErrNotFound {
-		return nil
-	}
-
-	for node_id, node_status := range nodes {
-		// delete the node if it is disconnected
-		if time.Since(time.Unix(node_status.LastPingAt, 0)) > NODE_DISCONNECTED_TIMEOUT {
-			// gc the node
-			if err := c.gcNode(node_id); err != nil {
-				add_error(err)
-				continue
-			}
-
-			// delete the node status
-			if err := cache.DelMapField(CLUSTER_STATUS_HASH_MAP_KEY, node_id); err != nil {
-				add_error(err)
-			}
-		}
-	}
-
-	return total_errors
-}
-
-// remove the resource associated with the node
-func (c *Cluster) gcNode(node_id string) error {
-	return nil
-}

+ 6 - 9
internal/cluster/init.go

@@ -3,19 +3,12 @@ package cluster
 import (
 	"sync"
 	"sync/atomic"
-	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster/cluster_id"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
 )
 
-type pluginLifeTime struct {
-	lifetime          entities.PluginRuntimeTimeLifeInterface
-	last_scheduled_at time.Time
-}
-
 type Cluster struct {
 	// id is the unique id of the cluster
 	id string
@@ -28,14 +21,18 @@ type Cluster struct {
 
 	// plugins stores all the plugin life time of the cluster
 	plugins     mapping.Map[string, *pluginLifeTime]
-	plugin_lock sync.Mutex
+	plugin_lock sync.RWMutex
 
 	// nodes stores all the nodes of the cluster
-	nodes mapping.Map[string, node]
+	nodes     mapping.Map[string, node]
+	node_lock sync.RWMutex
 
 	// signals for waiting for the cluster to stop
 	stop_chan chan bool
 	stopped   *int32
+
+	is_in_auto_gc_nodes   int32
+	is_in_auto_gc_plugins int32
 }
 
 func NewCluster(config *app.Config) *Cluster {

+ 39 - 10
internal/cluster/lifetime.go

@@ -7,13 +7,34 @@ import (
 )
 
 const (
-	MASTER_LOCKING_INTERVAL     = time.Millisecond * 500 // interval to try to lock the slot to be the master
-	MASTER_LOCK_EXPIRED_TIME    = time.Second * 5        // expired time of master key
-	MASTER_GC_INTERVAL          = time.Second * 10       // interval to do garbage collection of nodes has already deactivated
-	NODE_VOTE_INTERVAL          = time.Second * 30       // interval to vote the ips of the nodes
-	UPDATE_NODE_STATUS_INTERVAL = time.Second * 5        // interval to update the status of the node
-	NODE_DISCONNECTED_TIMEOUT   = time.Second * 10       // once a node is no longer active, it will be removed from the cluster
-	PLUGIN_SCHEDULER_INTERVAL   = time.Second * 3        // interval to schedule the plugins
+	// master
+	// the cluster master is responsible for managing garbage collection for both nodes and plugins.
+	// typically, each node handles the garbage collection for its own plugins
+	// However, if a node becomes inactive, the master takes over this task.
+	// every node has an equal chance of becoming the master.
+	// once a node is selected as the master, it is locked in that role.
+	// If the master node becomes inactive, the master slot is released, allowing other nodes to attempt to take over the role.
+	MASTER_LOCKING_INTERVAL  = time.Millisecond * 500 // interval to try to lock the slot to be the master
+	MASTER_LOCK_EXPIRED_TIME = time.Second * 5        // expired time of master key
+	MASTER_GC_INTERVAL       = time.Second * 10       // interval to do garbage collection of nodes has already deactivated
+
+	// node
+	// To determine the available IPs of the nodes, each node will vote for the IPs of other nodes.
+	// this voting process will occur every $NODE_VOTE_INTERVAL.
+	// simultaneously, all nodes will synchronize to the latest status in memory every $UPDATE_NODE_STATUS_INTERVAL.
+	// each node will also update its own status to remain active. If a node becomes inactive, it will be removed from the cluster.
+	NODE_VOTE_INTERVAL          = time.Second * 30 // interval to vote the ips of the nodes
+	UPDATE_NODE_STATUS_INTERVAL = time.Second * 5  // interval to update the status of the node
+	NODE_DISCONNECTED_TIMEOUT   = time.Second * 10 // once a node is no longer active, it will be removed from the cluster
+
+	// plugin scheduler
+	// each node will schedule its plugins every $PLUGIN_SCHEDULER_INTERVAL time
+	// and schedule process will be triggered every $PLUGIN_SCHEDULER_TICKER_INTERVAL time
+	// not all the plugins will be scheduled every time, only the plugins that are not scheduled in $PLUGIN_SCHEDULER_INTERVAL time will be scheduled
+	// and the plugins that are not active will be removed from the cluster
+	PLUGIN_SCHEDULER_TICKER_INTERVAL = time.Second * 3  // interval to schedule the plugins
+	PLUGIN_SCHEDULER_INTERVAL        = time.Second * 10 // interval to schedule the plugins
+	PLUGIN_DEACTIVATED_TIMEOUT       = time.Second * 30 // once a plugin is no longer active, it will be removed from the cluster
 )
 
 // lifetime of the cluster
@@ -33,10 +54,16 @@ func (c *Cluster) clusterLifetime() {
 	plugin_scheduler_ticker := time.NewTicker(PLUGIN_SCHEDULER_INTERVAL)
 	defer plugin_scheduler_ticker.Stop()
 
+	// vote for all ips and find the best one, prepare for later traffic scheduling
 	if err := c.voteIps(); err != nil {
 		log.Error("failed to vote the ips of the nodes: %s", err.Error())
 	}
 
+	// fetch all possible nodes
+	if err := c.updateNodeStatus(); err != nil {
+		log.Error("failed to update the status of the node: %s", err.Error())
+	}
+
 	for {
 		select {
 		case <-ticker_lock_master.C:
@@ -49,7 +76,6 @@ func (c *Cluster) clusterLifetime() {
 					log.Info("current node has become the master of the cluster")
 				} else {
 					c.i_am_master = false
-					log.Info("current node lost the master slot")
 				}
 			} else {
 				// update the master
@@ -63,8 +89,11 @@ func (c *Cluster) clusterLifetime() {
 			}
 		case <-master_gc_ticker.C:
 			if c.i_am_master {
-				if err := c.gcNodes(); err != nil {
-					log.Error("failed to gc the nodes has already deactivated: %s", err.Error())
+				if err := c.autoGCNodes(); err != nil {
+					log.Error("failed to gc the nodes have already deactivated: %s", err.Error())
+				}
+				if err := c.autoGCPlugins(); err != nil {
+					log.Error("failed to gc the plugins have already stopped: %s", err.Error())
 				}
 			}
 		case <-node_vote_ticker.C:

+ 0 - 44
internal/cluster/lock.go

@@ -1,44 +0,0 @@
-package cluster
-
-import (
-	"strings"
-	"time"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
-)
-
-const (
-	CLUSTER_STATE_TENANT_LOCK_PREFIX       = "cluster_state_tenant_lock"
-	CLUSTER_STATE_PLUGIN_LOCK_PREFIX       = "cluster_state_plugin_lock"
-	CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX = "cluster_update_node_status_lock"
-)
-
-func (c *Cluster) LockTenant(tenant_id string) error {
-	key := strings.Join([]string{CLUSTER_STATE_TENANT_LOCK_PREFIX, tenant_id}, ":")
-	return cache.Lock(key, time.Second*5, time.Second)
-}
-
-func (c *Cluster) UnlockTenant(tenant_id string) error {
-	key := strings.Join([]string{CLUSTER_STATE_TENANT_LOCK_PREFIX, tenant_id}, ":")
-	return cache.Unlock(key)
-}
-
-func (c *Cluster) LockPlugin(plugin_id string) error {
-	key := strings.Join([]string{CLUSTER_STATE_PLUGIN_LOCK_PREFIX, plugin_id}, ":")
-	return cache.Lock(key, time.Second*5, time.Second)
-}
-
-func (c *Cluster) UnlockPlugin(plugin_id string) error {
-	key := strings.Join([]string{CLUSTER_STATE_PLUGIN_LOCK_PREFIX, plugin_id}, ":")
-	return cache.Unlock(key)
-}
-
-func (c *Cluster) LockNodeStatus(node_id string) error {
-	key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
-	return cache.Lock(key, time.Second*5, time.Second)
-}
-
-func (c *Cluster) UnlockNodeStatus(node_id string) error {
-	key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
-	return cache.Unlock(key)
-}

+ 222 - 0
internal/cluster/node.go

@@ -0,0 +1,222 @@
+package cluster
+
+import (
+	"errors"
+	"net"
+	"strings"
+	"sync/atomic"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/network"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+)
+
+// update the status of the node
+func (c *Cluster) updateNodeStatus() error {
+	if err := c.LockNodeStatus(c.id); err != nil {
+		return err
+	}
+	defer c.UnlockNodeStatus(c.id)
+
+	// update the status of the node
+	node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, c.id)
+	if err != nil {
+		if err == cache.ErrNotFound {
+			// try to get ips configs
+			ips, err := network.FetchCurrentIps()
+			if err != nil {
+				return err
+			}
+			node_status = &node{
+				Ips: parser.Map(func(from net.IP) ip {
+					return ip{
+						Address: from.String(),
+						Votes:   []vote{},
+					}
+				}, ips),
+			}
+		} else {
+			return err
+		}
+	} else {
+		ips, err := network.FetchCurrentIps()
+		if err != nil {
+			return err
+		}
+		// add new ip if not exist
+		for _, _ip := range ips {
+			found := false
+			for _, node_ip := range node_status.Ips {
+				if node_ip.Address == _ip.String() {
+					found = true
+					break
+				}
+			}
+			if !found {
+				node_status.Ips = append(node_status.Ips, ip{
+					Address: _ip.String(),
+					Votes:   []vote{},
+				})
+			}
+		}
+	}
+
+	// refresh the last ping time
+	node_status.LastPingAt = time.Now().Unix()
+
+	// update the status of the node
+	if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, c.id, node_status); err != nil {
+		return err
+	}
+
+	// get all the nodes
+	nodes, err := c.GetNodes()
+	if err != nil {
+		return err
+	}
+
+	// update self nodes map
+	c.node_lock.Lock()
+	defer c.node_lock.Unlock()
+
+	for node_id, node := range nodes {
+		c.nodes.Clear()
+		c.nodes.Store(node_id, node)
+	}
+
+	return nil
+}
+
+func (c *Cluster) GetNodes() (map[string]node, error) {
+	nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
+	if err != nil {
+		return nil, err
+	}
+
+	for node_id, node := range nodes {
+		// filter out the disconnected nodes
+		if !node.available() {
+			delete(nodes, node_id)
+		}
+	}
+
+	return nodes, nil
+}
+
+// FetchPluginAvailableNodes fetches the available nodes of the given plugin
+func (c *Cluster) FetchPluginAvailableNodes(hashed_plugin_id string) ([]string, error) {
+	states, err := cache.ScanMap[entities.PluginRuntimeState](
+		PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashed_plugin_id),
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	nodes := make([]string, 0)
+	for key := range states {
+		node_id, _, err := c.splitNodePluginJoin(key)
+		if err != nil {
+			continue
+		}
+		if c.nodes.Exits(node_id) {
+			nodes = append(nodes, node_id)
+		}
+	}
+
+	return nodes, nil
+}
+
+func (c *Cluster) IsMaster() bool {
+	return c.i_am_master
+}
+
+func (c *Cluster) IsNodeAlive(node_id string) bool {
+	node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
+	if err != nil {
+		return false
+	}
+
+	return node_status.available()
+}
+
+// gc the nodes has already deactivated
+func (c *Cluster) autoGCNodes() error {
+	if atomic.LoadInt32(&c.is_in_auto_gc_nodes) == 1 {
+		return nil
+	}
+	defer atomic.StoreInt32(&c.is_in_auto_gc_nodes, 0)
+
+	var total_errors error
+	add_error := func(err error) {
+		if err != nil {
+			if total_errors == nil {
+				total_errors = err
+			} else {
+				total_errors = errors.Join(total_errors, err)
+			}
+		}
+	}
+
+	// get all nodes status
+	nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
+	if err == cache.ErrNotFound {
+		return nil
+	}
+
+	for node_id, node_status := range nodes {
+		// delete the node if it is disconnected
+		if !node_status.available() {
+			// gc the node
+			if err := c.gcNode(node_id); err != nil {
+				add_error(err)
+				continue
+			}
+		}
+	}
+
+	return total_errors
+}
+
+// remove the resource associated with the node
+func (c *Cluster) gcNode(node_id string) error {
+	// remove all plugins associated with the node
+	if err := c.forceGCNodePlugins(node_id); err != nil {
+		return err
+	}
+
+	// remove the node from the cluster
+	c.node_lock.Lock()
+	c.nodes.Delete(node_id)
+	c.node_lock.Unlock()
+
+	if err := c.LockNodeStatus(node_id); err != nil {
+		return err
+	}
+	defer c.UnlockNodeStatus(node_id)
+
+	err := cache.DelMapField(CLUSTER_STATUS_HASH_MAP_KEY, node_id)
+	if err != nil {
+		return err
+	} else {
+		log.Info("node %s has been removed from the cluster due to being disconnected", node_id)
+	}
+
+	return nil
+}
+
+const (
+	CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX = "cluster-update-node-status-lock"
+)
+
+func (c *Cluster) LockNodeStatus(node_id string) error {
+	key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
+	return cache.Lock(key, time.Second*5, time.Second)
+}
+
+func (c *Cluster) UnlockNodeStatus(node_id string) error {
+	key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
+	return cache.Unlock(key)
+}

+ 239 - 0
internal/cluster/plugin.go

@@ -0,0 +1,239 @@
+package cluster
+
+import (
+	"errors"
+	"strings"
+	"sync/atomic"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+)
+
+type pluginLifeTime struct {
+	lifetime          entities.PluginRuntimeTimeLifeInterface
+	last_scheduled_at time.Time
+}
+
+type pluginState struct {
+	entities.PluginRuntimeState
+	Identity string `json:"identity"`
+}
+
+// RegisterPlugin registers a plugin to the cluster, and start to be scheduled
+func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
+	identity, err := lifetime.Identity()
+	if err != nil {
+		return err
+	}
+
+	if c.plugins.Exits(identity) {
+		return errors.New("plugin has been registered")
+	}
+
+	l := &pluginLifeTime{
+		lifetime: lifetime,
+	}
+
+	lifetime.OnStop(func() {
+		c.plugin_lock.Lock()
+		c.plugins.Delete(identity)
+		// remove plugin state
+		c.doPluginStateUpdate(l)
+		c.plugin_lock.Unlock()
+	})
+
+	c.plugin_lock.Lock()
+	if !lifetime.Stopped() {
+		c.plugins.Store(identity, l)
+
+		// do plugin state update immediately
+		err = c.doPluginStateUpdate(l)
+		if err != nil {
+			c.plugin_lock.Unlock()
+			return err
+		}
+	}
+	c.plugin_lock.Unlock()
+
+	log.Info("start to schedule plugin %s", identity)
+
+	return nil
+}
+
+const (
+	PLUGIN_STATE_MAP_KEY = "plugin_state"
+)
+
+func (c *Cluster) getPluginStateKey(node_id string, plugin_id string) string {
+	return node_id + ":" + plugin_id
+}
+
+func (c *Cluster) getScanPluginsByNodeKey(node_id string) string {
+	return node_id + ":*"
+}
+
+func (c *Cluster) getScanPluginsByIdKey(plugin_id string) string {
+	return "*:" + plugin_id
+}
+
+// SchedulePlugin schedules a plugin to the cluster
+// it will walk through the plugin state map and update all the states
+// as for the plugin has exited, normally, it will be removed automatically
+// but once a plugin is not removed, it will be gc by the master node
+func (c *Cluster) schedulePlugins() error {
+	c.plugins.Range(func(key string, value *pluginLifeTime) bool {
+		if time.Since(value.last_scheduled_at) < PLUGIN_SCHEDULER_INTERVAL {
+			return true
+		}
+		// do plugin state update
+		err := c.doPluginStateUpdate(value)
+		if err != nil {
+			log.Error("failed to update plugin state: %s", err.Error())
+		}
+
+		return true
+	})
+
+	return nil
+}
+
+// doPluginUpdate updates the plugin state and schedule the plugin
+func (c *Cluster) doPluginStateUpdate(lifetime *pluginLifeTime) error {
+	state := lifetime.lifetime.RuntimeState()
+	hash_identity, err := lifetime.lifetime.HashedIdentity()
+	if err != nil {
+		return err
+	}
+
+	identity, err := lifetime.lifetime.Identity()
+	if err != nil {
+		return err
+	}
+
+	schedule_state := &pluginState{
+		Identity:           identity,
+		PluginRuntimeState: state,
+	}
+
+	state_key := c.getPluginStateKey(c.id, hash_identity)
+
+	// check if the plugin has been removed
+	if !c.plugins.Exits(identity) {
+		// remove state
+		err = c.removePluginState(c.id, hash_identity)
+		if err != nil {
+			return err
+		}
+	} else {
+		// update plugin state
+		schedule_state.ScheduledAt = &[]time.Time{time.Now()}[0]
+		err = cache.SetMapOneField(PLUGIN_STATE_MAP_KEY, state_key, schedule_state)
+		if err != nil {
+			return err
+		}
+		lifetime.lifetime.UpdateScheduledAt(*schedule_state.ScheduledAt)
+	}
+
+	lifetime.last_scheduled_at = time.Now()
+
+	return nil
+}
+
+func (c *Cluster) removePluginState(node_id string, hashed_identity string) error {
+	err := cache.DelMapField(PLUGIN_STATE_MAP_KEY, c.getPluginStateKey(node_id, hashed_identity))
+	if err != nil {
+		return err
+	}
+
+	log.Info("plugin %s has been removed from node %s", hashed_identity, c.id)
+
+	return nil
+}
+
+// forceGCNodePlugins will force garbage collect all the plugins on the node
+func (c *Cluster) forceGCNodePlugins(node_id string) error {
+	return cache.ScanMapAsync[pluginState](
+		PLUGIN_STATE_MAP_KEY,
+		c.getScanPluginsByNodeKey(node_id),
+		func(m map[string]pluginState) error {
+			for _, plugin_state := range m {
+				if err := c.forceGCNodePlugin(node_id, plugin_state.Identity); err != nil {
+					return err
+				}
+			}
+			return nil
+		},
+	)
+}
+
+// forceGCNodePlugin will force garbage collect the plugin on the node
+func (c *Cluster) forceGCNodePlugin(node_id string, plugin_id string) error {
+	if node_id == c.id {
+		c.plugin_lock.Lock()
+		c.plugins.Delete(plugin_id)
+		c.plugin_lock.Unlock()
+	}
+
+	if err := c.removePluginState(node_id, entities.HashedIdentity(plugin_id)); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// forceGCPluginByNodePluginJoin will force garbage collect the plugin by node_plugin_join
+func (c *Cluster) forceGCPluginByNodePluginJoin(node_plugin_join string) error {
+	return cache.DelMapField(PLUGIN_STATE_MAP_KEY, node_plugin_join)
+}
+
+func (c *Cluster) isPluginActive(state *pluginState) bool {
+	return state != nil && state.ScheduledAt != nil && time.Since(*state.ScheduledAt) < 60*time.Second
+}
+
+func (c *Cluster) splitNodePluginJoin(node_plugin_join string) (node_id string, plugin_hashed_id string, err error) {
+	split := strings.Split(node_plugin_join, ":")
+	if len(split) != 2 {
+		return "", "", errors.New("invalid node_plugin_join")
+	}
+
+	return split[0], split[1], nil
+}
+
+// autoGCPlugins will automatically garbage collect the plugins that are no longer active
+func (c *Cluster) autoGCPlugins() error {
+	// skip if already in auto gc
+	if atomic.LoadInt32(&c.is_in_auto_gc_plugins) == 1 {
+		return nil
+	}
+	defer atomic.StoreInt32(&c.is_in_auto_gc_plugins, 0)
+
+	return cache.ScanMapAsync[pluginState](
+		PLUGIN_STATE_MAP_KEY,
+		"*",
+		func(m map[string]pluginState) error {
+			for node_plugin_join, plugin_state := range m {
+				if !c.isPluginActive(&plugin_state) {
+					node_id, _, err := c.splitNodePluginJoin(node_plugin_join)
+					if err != nil {
+						return err
+					}
+
+					// force gc the plugin
+					if err := c.forceGCNodePlugin(node_id, plugin_state.Identity); err != nil {
+						return err
+					}
+
+					// one more time to force gc the plugin, there is a possibility
+					// that the hash value of plugin's identity is not the same as the node_plugin_join
+					// so we need to force gc the plugin by node_plugin_join again
+					if err := c.forceGCPluginByNodePluginJoin(node_plugin_join); err != nil {
+						return err
+					}
+				}
+			}
+			return nil
+		},
+	)
+}

+ 1 - 79
internal/cluster/preemptive.go

@@ -2,12 +2,8 @@ package cluster
 
 import (
 	"errors"
-	"net"
-	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/network"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
 // Plugin daemon will preemptively try to lock the slot to be the master of the cluster
@@ -31,7 +27,7 @@ import (
 // A node will be removed from the cluster if it is no longer active
 
 const (
-	CLUSTER_STATUS_HASH_MAP_KEY = "cluster-status-hash-map"
+	CLUSTER_STATUS_HASH_MAP_KEY = "cluster-nodes-status-hash-map"
 	PREEMPTION_LOCK_KEY         = "cluster-master-preemption-lock"
 )
 
@@ -69,77 +65,3 @@ func (c *Cluster) updateMaster() error {
 
 	return nil
 }
-
-// update the status of the node
-func (c *Cluster) updateNodeStatus() error {
-	if err := c.LockNodeStatus(c.id); err != nil {
-		return err
-	}
-	defer c.UnlockNodeStatus(c.id)
-
-	// update the status of the node
-	node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, c.id)
-	if err != nil {
-		if err == cache.ErrNotFound {
-			// try to get ips configs
-			ips, err := network.FetchCurrentIps()
-			if err != nil {
-				return err
-			}
-			node_status = &node{
-				Ips: parser.Map(func(from net.IP) ip {
-					return ip{
-						Address: from.String(),
-						Votes:   []vote{},
-					}
-				}, ips),
-			}
-		} else {
-			return err
-		}
-	} else {
-		ips, err := network.FetchCurrentIps()
-		if err != nil {
-			return err
-		}
-		// add new ip if not exist
-		for _, _ip := range ips {
-			found := false
-			for _, node_ip := range node_status.Ips {
-				if node_ip.Address == _ip.String() {
-					found = true
-					break
-				}
-			}
-			if !found {
-				node_status.Ips = append(node_status.Ips, ip{
-					Address: _ip.String(),
-					Votes:   []vote{},
-				})
-			}
-		}
-	}
-
-	// refresh the last ping time
-	node_status.LastPingAt = time.Now().Unix()
-
-	// update the status of the node
-	if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, c.id, node_status); err != nil {
-		return err
-	}
-
-	return nil
-}
-
-func (c *Cluster) IsMaster() bool {
-	return c.i_am_master
-}
-
-func (c *Cluster) IsNodeAlive(node_id string) bool {
-	node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
-	if err != nil {
-		return false
-	}
-
-	return time.Since(time.Unix(node_status.LastPingAt, 0)) < NODE_DISCONNECTED_TIMEOUT
-}

+ 0 - 152
internal/cluster/state.go

@@ -1,152 +0,0 @@
-package cluster
-
-import (
-	"sync/atomic"
-	"time"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-)
-
-// RegisterPlugin registers a plugin to the cluster, and start to be scheduled
-func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
-	identity, err := lifetime.Identity()
-	if err != nil {
-		return err
-	}
-
-	done := make(chan bool)
-	closed := new(int32)
-	close := func() {
-		if atomic.CompareAndSwapInt32(closed, 0, 1) {
-			close(done)
-		}
-	}
-
-	l := &pluginLifeTime{
-		lifetime: lifetime,
-	}
-
-	lifetime.OnStop(func() {
-		c.plugin_lock.Lock()
-		c.plugins.Delete(identity)
-		// remove plugin state
-		c.doPluginStateUpdate(l)
-		c.plugin_lock.Unlock()
-		close()
-	})
-
-	c.plugin_lock.Lock()
-	if !lifetime.Stopped() {
-		c.plugins.Store(identity, l)
-
-		// do plugin state update immediately
-		err = c.doPluginStateUpdate(l)
-		if err != nil {
-			close()
-			c.plugin_lock.Unlock()
-			return err
-		}
-	} else {
-		close()
-	}
-	c.plugin_lock.Unlock()
-
-	log.Info("start to schedule plugin %s", identity)
-
-	return nil
-}
-
-const (
-	PLUGIN_STATE_MAP_KEY = "plugin_state"
-)
-
-func (c *Cluster) getPluginStateKey(node_id string, plugin_id string) string {
-	return node_id + ":" + plugin_id
-}
-
-func (c *Cluster) getScanPluginsByNodeKey(node_id string) string {
-	return node_id + ":*"
-}
-
-func (c *Cluster) getScanPluginsByIdKey(plugin_id string) string {
-	return "*:" + plugin_id
-}
-
-func (c *Cluster) FetchPluginAvailableNodes(hashed_plugin_id string) ([]string, error) {
-	states, err := cache.ScanMap[entities.PluginRuntimeState](PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashed_plugin_id))
-	if err != nil {
-		return nil, err
-	}
-
-	nodes := make([]string, 0)
-	for key := range states {
-		// split key into node_id and plugin_id
-		if len(key) < len(hashed_plugin_id)+1 {
-			log.Error("unexpected plugin state key: %s", key)
-			continue
-		}
-		node_id := key[:len(key)-len(hashed_plugin_id)-1]
-		nodes = append(nodes, node_id)
-	}
-
-	return nodes, nil
-}
-
-// SchedulePlugin schedules a plugin to the cluster
-// it will walk through the plugin state map and update all the states
-// as for the plugin has exited, normally, it will be removed automatically
-// but once a plugin is not removed, it will be gc by the master node
-func (c *Cluster) schedulePlugins() error {
-	c.plugins.Range(func(key string, value *pluginLifeTime) bool {
-		// do plugin state update
-		err := c.doPluginStateUpdate(value)
-		if err != nil {
-			log.Error("failed to update plugin state: %s", err.Error())
-		}
-
-		return true
-	})
-
-	return nil
-}
-
-// doPluginUpdate updates the plugin state and schedule the plugin
-func (c *Cluster) doPluginStateUpdate(lifetime *pluginLifeTime) error {
-	state := lifetime.lifetime.RuntimeState()
-	hash_identity, err := lifetime.lifetime.HashedIdentity()
-	if err != nil {
-		return err
-	}
-
-	identity, err := lifetime.lifetime.Identity()
-	if err != nil {
-		return err
-	}
-
-	state_key := c.getPluginStateKey(c.id, hash_identity)
-
-	// check if the plugin has been removed
-	if !c.plugins.Exits(identity) {
-		// remove state
-		err = c.removePluginState(hash_identity)
-		if err != nil {
-			return err
-		}
-	} else {
-		// update plugin state
-		state.ScheduledAt = &[]time.Time{time.Now()}[0]
-		lifetime.lifetime.UpdateState(state)
-		err = cache.SetMapOneField(PLUGIN_STATE_MAP_KEY, state_key, state)
-		if err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
-func (c *Cluster) removePluginState(hashed_identity string) error {
-	return cache.DelMapField(PLUGIN_STATE_MAP_KEY, c.getPluginStateKey(c.id, hashed_identity))
-}

+ 8 - 6
internal/core/plugin_manager/lifetime.go

@@ -9,12 +9,14 @@ import (
 )
 
 func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInterface) {
-	start_failed_times := 0
 	configuration := r.Configuration()
 
 	log.Info("new plugin logged in: %s", configuration.Identity())
 	defer log.Info("plugin %s has exited", configuration.Identity())
 
+	// stop plugin when the plugin reaches the end of its lifetime
+	defer r.Stop()
+
 	// add plugin to cluster
 	err := p.cluster.RegisterPlugin(r)
 	if err != nil {
@@ -22,6 +24,8 @@ func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInt
 		return
 	}
 
+	start_failed_times := 0
+
 	// remove lifetime state after plugin if it has been stopped
 	defer r.TriggerStop()
 
@@ -34,7 +38,7 @@ func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInt
 					"init environment failed 3 times, plugin %s has been stopped",
 					configuration.Identity(),
 				)
-				r.Stop()
+				break
 			}
 			start_failed_times++
 			continue
@@ -53,7 +57,7 @@ func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInt
 					"start plugin failed 3 times, plugin %s has been stopped",
 					configuration.Identity(),
 				)
-				r.Stop()
+				break
 			}
 
 			start_failed_times++
@@ -70,8 +74,6 @@ func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInt
 		time.Sleep(5 * time.Second)
 
 		// add restart times
-		state := r.RuntimeState()
-		state.Restarts++
-		r.UpdateState(state)
+		r.AddRestarts()
 	}
 }

+ 7 - 7
internal/core/plugin_manager/local_manager/run.go

@@ -25,7 +25,7 @@ func (r *LocalPluginRuntime) gc() {
 
 func (r *LocalPluginRuntime) init() {
 	r.w = make(chan bool)
-	r.State.Status = entities.PLUGIN_RUNTIME_STATUS_LAUNCHING
+	r.SetLaunching()
 }
 
 func (r *LocalPluginRuntime) Type() entities.PluginRuntimeType {
@@ -44,27 +44,27 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// get writer
 	stdin, err := e.StdinPipe()
 	if err != nil {
-		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		r.SetRestarting()
 		return fmt.Errorf("get stdin pipe failed: %s", err.Error())
 	}
 	defer stdin.Close()
 
 	stdout, err := e.StdoutPipe()
 	if err != nil {
-		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		r.SetRestarting()
 		return fmt.Errorf("get stdout pipe failed: %s", err.Error())
 	}
 	defer stdout.Close()
 
 	stderr, err := e.StderrPipe()
 	if err != nil {
-		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		r.SetRestarting()
 		return fmt.Errorf("get stderr pipe failed: %s", err.Error())
 	}
 	defer stderr.Close()
 
 	if err := e.Start(); err != nil {
-		r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+		r.SetRestarting()
 		return err
 	}
 
@@ -76,7 +76,7 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 		// wait for plugin to exit
 		err = e.Wait()
 		if err != nil {
-			r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
+			r.SetRestarting()
 			log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
 		}
 
@@ -115,7 +115,7 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	wg.Wait()
 
 	// plugin has exited
-	r.State.Status = entities.PLUGIN_RUNTIME_STATUS_PENDING
+	r.SetPending()
 	return nil
 }
 

+ 3 - 0
internal/core/plugin_manager/remote_manager/hooks.go

@@ -159,6 +159,9 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		// registration transferred
 		runtime.registration_transferred = true
 
+		runtime.InitState()
+		runtime.SetActiveAt(time.Now())
+
 		// publish runtime to watcher
 		s.response.Write(runtime)
 	} else {

+ 0 - 1
internal/core/plugin_manager/watcher.go

@@ -107,7 +107,6 @@ func loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
 						RelativePath: path.Join(root_path, plugin.Name()),
 						ActiveAt:     nil,
 						Verified:     err == nil,
-						Identity:     configuration.Identity(),
 					},
 				}
 			}

+ 1 - 0
internal/server/server.go

@@ -12,6 +12,7 @@ import (
 func (a *App) Run(config *app.Config) {
 	a.cluster = cluster.NewCluster(config)
 	plugin_manager.InitGlobalPluginManager(a.cluster)
+	a.plugin_manager = plugin_manager.GetGlobalPluginManager()
 
 	// init routine pool
 	routine.InitPool(config.RoutinePoolSize)

+ 58 - 4
internal/types/entities/runtime.go

@@ -47,13 +47,28 @@ type (
 		// returns the runtime state of the plugin
 		RuntimeState() PluginRuntimeState
 		// Update the runtime state of the plugin
-		UpdateState(state PluginRuntimeState)
+		UpdateScheduledAt(t time.Time)
 		// returns the checksum of the plugin
 		Checksum() string
 		// wait for the plugin to stop
 		Wait() (<-chan bool, error)
 		// returns the runtime type of the plugin
 		Type() PluginRuntimeType
+
+		// set the plugin to active
+		SetActive()
+		// set the plugin to launching
+		SetLaunching()
+		// set the plugin to restarting
+		SetRestarting()
+		// set the plugin to pending
+		SetPending()
+		// set the active time of the plugin
+		SetActiveAt(t time.Time)
+		// set the scheduled time of the plugin
+		SetScheduledAt(t time.Time)
+		// add restarts to the plugin
+		AddRestarts()
 	}
 
 	PluginRuntimeSessionIOInterface interface {
@@ -92,8 +107,48 @@ func (r *PluginRuntime) RuntimeState() PluginRuntimeState {
 	return r.State
 }
 
-func (r *PluginRuntime) UpdateState(state PluginRuntimeState) {
-	r.State = state
+func (r *PluginRuntime) UpdateScheduledAt(t time.Time) {
+	r.State.ScheduledAt = &t
+}
+
+func (r *PluginRuntime) InitState() {
+	r.State = PluginRuntimeState{
+		Restarts:    0,
+		Status:      PLUGIN_RUNTIME_STATUS_PENDING,
+		ActiveAt:    nil,
+		StoppedAt:   nil,
+		Verified:    false,
+		ScheduledAt: nil,
+		Logs:        []string{},
+	}
+}
+
+func (r *PluginRuntime) SetActive() {
+	r.State.Status = PLUGIN_RUNTIME_STATUS_ACTIVE
+}
+
+func (r *PluginRuntime) SetLaunching() {
+	r.State.Status = PLUGIN_RUNTIME_STATUS_LAUNCHING
+}
+
+func (r *PluginRuntime) SetRestarting() {
+	r.State.Status = PLUGIN_RUNTIME_STATUS_RESTARTING
+}
+
+func (r *PluginRuntime) SetPending() {
+	r.State.Status = PLUGIN_RUNTIME_STATUS_PENDING
+}
+
+func (r *PluginRuntime) SetActiveAt(t time.Time) {
+	r.State.ActiveAt = &t
+}
+
+func (r *PluginRuntime) SetScheduledAt(t time.Time) {
+	r.State.ScheduledAt = &t
+}
+
+func (r *PluginRuntime) AddRestarts() {
+	r.State.Restarts++
 }
 
 func (r *PluginRuntime) Checksum() string {
@@ -123,7 +178,6 @@ const (
 )
 
 type PluginRuntimeState struct {
-	Identity     string     `json:"identity"`
 	Restarts     int        `json:"restarts"`
 	Status       string     `json:"status"`
 	RelativePath string     `json:"relative_path"`

+ 0 - 1
internal/types/entities/runtime_test.go

@@ -7,7 +7,6 @@ import (
 
 func TestRuntimeStateHash(t *testing.T) {
 	state := PluginRuntimeState{
-		Identity:     "test",
 		Restarts:     0,
 		Status:       PLUGIN_RUNTIME_STATUS_PENDING,
 		RelativePath: "aaa",