浏览代码

feat: plugin scheduler

Yeuoly 1 年之前
父节点
当前提交
f1f67cb30e

+ 23 - 2
internal/cluster/init.go

@@ -2,24 +2,45 @@ package cluster
 
 import (
 	"sync"
+	"sync/atomic"
+	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 )
 
+type PluginLifeTime struct {
+	lifetime          entities.PluginRuntimeTimeLifeInterface
+	last_scheduled_at time.Time
+}
+
 type Cluster struct {
+	i_am_master bool
+
 	port uint16
 
 	plugins     map[string]*PluginLifeTime
 	plugin_lock sync.Mutex
+
+	stop_chan chan bool
+	stopped   *int32
 }
 
 func NewCluster(config *app.Config) *Cluster {
 	return &Cluster{
-		port:    uint16(config.ServerPort),
-		plugins: make(map[string]*PluginLifeTime),
+		port:      uint16(config.ServerPort),
+		plugins:   make(map[string]*PluginLifeTime),
+		stop_chan: make(chan bool),
+		stopped:   new(int32),
 	}
 }
 
 func (c *Cluster) Launch(config *app.Config) {
 	go c.clusterLifetime()
 }
+
+func (c *Cluster) Close() {
+	if atomic.CompareAndSwapInt32(c.stopped, 0, 1) {
+		close(c.stop_chan)
+	}
+}

+ 82 - 0
internal/cluster/lifetime.go

@@ -0,0 +1,82 @@
+package cluster
+
+import (
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+)
+
+const (
+	MASTER_LOCKING_INTERVAL     = time.Millisecond * 500 // interval to try to lock the slot to be the master
+	MASTER_LOCK_EXPIRED_TIME    = time.Second * 5        // expired time of master key
+	MASTER_GC_INTERVAL          = time.Second * 10       // interval to do garbage collection of nodes has already deactivated
+	NODE_VOTE_INTERVAL          = time.Second * 30       // interval to vote the ips of the nodes
+	UPDATE_NODE_STATUS_INTERVAL = time.Second * 5        // interval to update the status of the node
+	NODE_DISCONNECTED_TIMEOUT   = time.Second * 10       // once a node is no longer active, it will be removed from the cluster
+	PLUGIN_SCHEDULER_INTERVAL   = time.Second * 3        // interval to schedule the plugins
+)
+
+// lifetime of the cluster
+func (c *Cluster) clusterLifetime() {
+	ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)
+	defer ticker_lock_master.Stop()
+
+	ticker_update_node_status := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)
+	defer ticker_update_node_status.Stop()
+
+	master_gc_ticker := time.NewTicker(MASTER_GC_INTERVAL)
+	defer master_gc_ticker.Stop()
+
+	node_vote_ticker := time.NewTicker(NODE_VOTE_INTERVAL)
+	defer node_vote_ticker.Stop()
+
+	plugin_scheduler_ticker := time.NewTicker(PLUGIN_SCHEDULER_INTERVAL)
+	defer plugin_scheduler_ticker.Stop()
+
+	if err := c.voteIps(); err != nil {
+		log.Error("failed to vote the ips of the nodes: %s", err.Error())
+	}
+
+	for {
+		select {
+		case <-ticker_lock_master.C:
+			if !c.i_am_master {
+				// try lock the slot
+				if success, err := c.lockMaster(); err != nil {
+					log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())
+				} else if success {
+					c.i_am_master = true
+					log.Info("current node has become the master of the cluster")
+				} else {
+					c.i_am_master = false
+					log.Info("current node lost the master slot")
+				}
+			} else {
+				// update the master
+				if err := c.updateMaster(); err != nil {
+					log.Error("failed to update the master: %s", err.Error())
+				}
+			}
+		case <-ticker_update_node_status.C:
+			if err := c.updateNodeStatus(); err != nil {
+				log.Error("failed to update the status of the node: %s", err.Error())
+			}
+		case <-master_gc_ticker.C:
+			if c.i_am_master {
+				if err := c.gcNodes(); err != nil {
+					log.Error("failed to gc the nodes has already deactivated: %s", err.Error())
+				}
+			}
+		case <-node_vote_ticker.C:
+			if err := c.voteIps(); err != nil {
+				log.Error("failed to vote the ips of the nodes: %s", err.Error())
+			}
+		case <-plugin_scheduler_ticker.C:
+			if err := c.schedulePlugins(); err != nil {
+				log.Error("failed to schedule the plugins: %s", err.Error())
+			}
+		case <-c.stop_chan:
+			return
+		}
+	}
+}

+ 1 - 71
internal/cluster/preemptive.go

@@ -7,7 +7,6 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster/cluster_id"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/network"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
@@ -32,80 +31,11 @@ import (
 //
 // A node will be removed from the cluster if it is no longer active
 
-var (
-	i_am_master = false
-)
-
 const (
 	CLUSTER_STATUS_HASH_MAP_KEY = "cluster-status-hash-map"
 	PREEMPTION_LOCK_KEY         = "cluster-master-preemption-lock"
 )
 
-const (
-	MASTER_LOCKING_INTERVAL     = time.Millisecond * 500 // interval to try to lock the slot to be the master
-	MASTER_LOCK_EXPIRED_TIME    = time.Second * 5        // expired time of master key
-	MASTER_GC_INTERVAL          = time.Second * 10       // interval to do garbage collection of nodes has already deactivated
-	NODE_VOTE_INTERVAL          = time.Second * 30       // interval to vote the ips of the nodes
-	UPDATE_NODE_STATUS_INTERVAL = time.Second * 5        // interval to update the status of the node
-	NODE_DISCONNECTED_TIMEOUT   = time.Second * 10       // once a node is no longer active, it will be removed from the cluster
-)
-
-// lifetime of the cluster
-func (c *Cluster) clusterLifetime() {
-	ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)
-	defer ticker_lock_master.Stop()
-
-	ticker_update_node_status := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)
-	defer ticker_update_node_status.Stop()
-
-	master_gc_ticker := time.NewTicker(MASTER_GC_INTERVAL)
-	defer master_gc_ticker.Stop()
-
-	node_vote_ticker := time.NewTicker(NODE_VOTE_INTERVAL)
-	defer node_vote_ticker.Stop()
-
-	if err := c.voteIps(); err != nil {
-		log.Error("failed to vote the ips of the nodes: %s", err.Error())
-	}
-
-	for {
-		select {
-		case <-ticker_lock_master.C:
-			if !i_am_master {
-				// try lock the slot
-				if success, err := c.lockMaster(); err != nil {
-					log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())
-				} else if success {
-					i_am_master = true
-					log.Info("current node has become the master of the cluster")
-				} else {
-					i_am_master = false
-					log.Info("current node lost the master slot")
-				}
-			} else {
-				// update the master
-				if err := c.updateMaster(); err != nil {
-					log.Error("failed to update the master: %s", err.Error())
-				}
-			}
-		case <-ticker_update_node_status.C:
-			if err := c.updateNodeStatus(); err != nil {
-				log.Error("failed to update the status of the node: %s", err.Error())
-			}
-		case <-master_gc_ticker.C:
-			if i_am_master {
-				if err := c.gcNodes(); err != nil {
-					log.Error("failed to gc the nodes has already deactivated: %s", err.Error())
-				}
-			}
-		case <-node_vote_ticker.C:
-			if err := c.voteIps(); err != nil {
-				log.Error("failed to vote the ips of the nodes: %s", err.Error())
-			}
-		}
-	}
-}
-
 // try lock the slot to be the master of the cluster
 // returns:
 //   - bool: true if the slot is locked by the node
@@ -203,7 +133,7 @@ func (c *Cluster) updateNodeStatus() error {
 }
 
 func (c *Cluster) IsMaster() bool {
-	return i_am_master
+	return c.i_am_master
 }
 
 func (c *Cluster) IsNodeAlive(node_id string) bool {

+ 20 - 5
internal/cluster/state.go

@@ -1,14 +1,12 @@
 package cluster
 
 import (
+	"sync/atomic"
+
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
-type PluginLifeTime struct {
-	lifetime entities.PluginRuntimeTimeLifeInterface
-}
-
 // RegisterPlugin registers a plugin to the cluster, and start to be scheduled
 func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
 	identity, err := lifetime.Identity()
@@ -16,10 +14,19 @@ func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterfac
 		return err
 	}
 
+	done := make(chan bool)
+	closed := new(int32)
+	close := func() {
+		if atomic.CompareAndSwapInt32(closed, 0, 1) {
+			close(done)
+		}
+	}
+
 	lifetime.OnStop(func() {
 		c.plugin_lock.Lock()
 		delete(c.plugins, identity)
 		c.plugin_lock.Unlock()
+		close()
 	})
 
 	c.plugin_lock.Lock()
@@ -27,6 +34,8 @@ func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterfac
 		c.plugins[identity] = &PluginLifeTime{
 			lifetime: lifetime,
 		}
+	} else {
+		close()
 	}
 	c.plugin_lock.Unlock()
 
@@ -35,6 +44,12 @@ func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterfac
 	return nil
 }
 
-func (c *Cluster) SchedulePlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
+// SchedulePlugin schedules a plugin to the cluster
+func (c *Cluster) schedulePlugins() error {
+	return nil
+}
+
+// doPluginUpdate updates the plugin state and schedule the plugin
+func (c *Cluster) doPluginStateUpdate(lifetime entities.PluginRuntimeTimeLifeInterface) error {
 	return nil
 }

+ 3 - 0
internal/core/plugin_manager/lifetime.go

@@ -68,5 +68,8 @@ func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInt
 
 		// restart plugin in 5s
 		time.Sleep(5 * time.Second)
+
+		// add restart times
+		r.RuntimeState().Restarts++
 	}
 }

+ 14 - 0
internal/types/entities/runtime.go

@@ -1,9 +1,14 @@
 package entities
 
 import (
+	"bytes"
+	"crypto/sha256"
+	"encoding/binary"
+	"encoding/hex"
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
 type (
@@ -28,6 +33,7 @@ type (
 		OnStop(func())
 		TriggerStop()
 		RuntimeState() *PluginRuntimeState
+		Checksum() string
 		Wait() (<-chan bool, error)
 		Type() PluginRuntimeType
 	}
@@ -58,6 +64,14 @@ func (r *PluginRuntime) RuntimeState() *PluginRuntimeState {
 	return &r.State
 }
 
+func (r *PluginRuntime) Checksum() string {
+	buf := bytes.Buffer{}
+	binary.Write(&buf, binary.BigEndian, parser.MarshalJsonBytes(r.Config))
+	hash := sha256.New()
+	hash.Write(buf.Bytes())
+	return hex.EncodeToString(hash.Sum(nil))
+}
+
 func (r *PluginRuntime) OnStop(f func()) {
 	r.onStopped = append(r.onStopped, f)
 }