Przeglądaj źródła

fix: cluster scheduler controller

Yeuoly 8 miesięcy temu
rodzic
commit
350bfafe14

+ 24 - 4
internal/cluster/cluster.go

@@ -3,6 +3,7 @@ package cluster
 import (
 	"sync"
 	"sync/atomic"
+	"time"
 
 	"github.com/google/uuid"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
@@ -49,14 +50,33 @@ type Cluster struct {
 	notifyClusterStoppedChan          chan bool
 
 	showLog bool
+
+	masterGcInterval              time.Duration
+	masterLockingInterval         time.Duration
+	masterLockExpiredTime         time.Duration
+	nodeVoteInterval              time.Duration
+	nodeDisconnectedTimeout       time.Duration
+	updateNodeStatusInterval      time.Duration
+	pluginSchedulerInterval       time.Duration
+	pluginSchedulerTickerInterval time.Duration
+	pluginDeactivatedTimeout      time.Duration
 }
 
 func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {
 	return &Cluster{
-		id:       uuid.New().String(),
-		port:     uint16(config.ServerPort),
-		stopChan: make(chan bool),
-		showLog:  config.DisplayClusterLog,
+		id:                            uuid.New().String(),
+		port:                          uint16(config.ServerPort),
+		stopChan:                      make(chan bool),
+		showLog:                       config.DisplayClusterLog,
+		masterGcInterval:              MASTER_GC_INTERVAL,
+		masterLockingInterval:         MASTER_LOCKING_INTERVAL,
+		masterLockExpiredTime:         MASTER_LOCK_EXPIRED_TIME,
+		nodeVoteInterval:              NODE_VOTE_INTERVAL,
+		nodeDisconnectedTimeout:       NODE_DISCONNECTED_TIMEOUT,
+		updateNodeStatusInterval:      UPDATE_NODE_STATUS_INTERVAL,
+		pluginSchedulerInterval:       PLUGIN_SCHEDULER_INTERVAL,
+		pluginSchedulerTickerInterval: PLUGIN_SCHEDULER_TICKER_INTERVAL,
+		pluginDeactivatedTimeout:      PLUGIN_DEACTIVATED_TIMEOUT,
 
 		manager: plugin_manager,
 

+ 1 - 1
internal/cluster/clutser_test.go

@@ -141,7 +141,7 @@ func TestClusterSubstituteMaster(t *testing.T) {
 		return
 	}
 
-	time.Sleep(MASTER_LOCK_EXPIRED_TIME + time.Second)
+	time.Sleep(clusters[0].masterLockExpiredTime + time.Second)
 
 	hasMaster := false
 

+ 5 - 5
internal/cluster/lifetime.go

@@ -63,19 +63,19 @@ func (c *Cluster) clusterLifetime() {
 		close(c.notifyNodeUpdateCompletedChan)
 	}()
 
-	tickerLockMaster := time.NewTicker(MASTER_LOCKING_INTERVAL)
+	tickerLockMaster := time.NewTicker(c.masterLockingInterval)
 	defer tickerLockMaster.Stop()
 
-	tickerUpdateNodeStatus := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)
+	tickerUpdateNodeStatus := time.NewTicker(c.updateNodeStatusInterval)
 	defer tickerUpdateNodeStatus.Stop()
 
-	masterGcTicker := time.NewTicker(MASTER_GC_INTERVAL)
+	masterGcTicker := time.NewTicker(c.masterGcInterval)
 	defer masterGcTicker.Stop()
 
-	nodeVoteTicker := time.NewTicker(NODE_VOTE_INTERVAL)
+	nodeVoteTicker := time.NewTicker(c.nodeVoteInterval)
 	defer nodeVoteTicker.Stop()
 
-	pluginSchedulerTicker := time.NewTicker(PLUGIN_SCHEDULER_INTERVAL)
+	pluginSchedulerTicker := time.NewTicker(c.pluginSchedulerTickerInterval)
 	defer pluginSchedulerTicker.Stop()
 
 	// vote for all ips and find the best one, prepare for later traffic scheduling

+ 2 - 2
internal/cluster/plugin.go

@@ -97,7 +97,7 @@ func (c *Cluster) schedulePlugins() error {
 	defer c.notifyPluginScheduleCompleted()
 
 	c.plugins.Range(func(key string, value *pluginLifeTime) bool {
-		if time.Since(value.lastScheduledAt) < PLUGIN_SCHEDULER_INTERVAL {
+		if time.Since(value.lastScheduledAt) < c.pluginSchedulerInterval {
 			return true
 		}
 		if c.showLog {
@@ -228,7 +228,7 @@ func (c *Cluster) forceGCPluginByNodePluginJoin(node_plugin_join string) error {
 }
 
 func (c *Cluster) isPluginActive(state *pluginState) bool {
-	return state != nil && state.ScheduledAt != nil && time.Since(*state.ScheduledAt) < 60*time.Second
+	return state != nil && state.ScheduledAt != nil && time.Since(*state.ScheduledAt) < c.pluginDeactivatedTimeout
 }
 
 func (c *Cluster) splitNodePluginJoin(node_plugin_join string) (nodeId string, plugin_hashed_id string, err error) {

+ 10 - 1
internal/cluster/plugin_test.go

@@ -144,6 +144,15 @@ func TestPluginScheduleWhenMasterClusterShutdown(t *testing.T) {
 		return
 	}
 
+	// set master gc interval to 1 second
+	for _, node := range cluster {
+		node.masterGcInterval = time.Second * 1
+		node.pluginSchedulerInterval = time.Second * 1
+		node.pluginSchedulerTickerInterval = time.Second * 1
+		node.updateNodeStatusInterval = time.Second * 1
+		node.pluginDeactivatedTimeout = time.Second * 3
+	}
+
 	launchSimulationCluster(cluster)
 	defer closeSimulationCluster(cluster, t)
 
@@ -208,7 +217,7 @@ func TestPluginScheduleWhenMasterClusterShutdown(t *testing.T) {
 	hashedIdentity := plugin_entities.HashedIdentity(identity.String())
 
 	ticker := time.NewTicker(time.Second)
-	timeout := time.NewTimer(MASTER_GC_INTERVAL * 3)
+	timeout := time.NewTimer(cluster[masterIdx].masterGcInterval * 10)
 	done := false
 	for !done {
 		select {

+ 2 - 2
internal/cluster/preemptive.go

@@ -40,7 +40,7 @@ func (c *Cluster) lockMaster() (bool, error) {
 	var finalError error
 
 	for i := 0; i < 3; i++ {
-		if success, err := cache.SetNX(PREEMPTION_LOCK_KEY, c.id, MASTER_LOCK_EXPIRED_TIME); err != nil {
+		if success, err := cache.SetNX(PREEMPTION_LOCK_KEY, c.id, c.masterLockExpiredTime); err != nil {
 			// try again
 			if finalError == nil {
 				finalError = err
@@ -60,7 +60,7 @@ func (c *Cluster) lockMaster() (bool, error) {
 // update master
 func (c *Cluster) updateMaster() error {
 	// update expired time of master key
-	if _, err := cache.Expire(PREEMPTION_LOCK_KEY, MASTER_LOCK_EXPIRED_TIME); err != nil {
+	if _, err := cache.Expire(PREEMPTION_LOCK_KEY, c.masterLockExpiredTime); err != nil {
 		return err
 	}