package cluster import ( "errors" "github.com/langgenius/dify-plugin-daemon/internal/utils/cache" ) // Plugin daemon will preemptively try to lock the slot to be the master of the cluster // and keep update current status of the whole cluster // once the master is no longer active, one of the slave will try to lock the slot again // and become the new master // // Once a node becomes master, It will take responsibility to gc the nodes has already deactivated // and all nodes should to maintenance their own status // // State: // - hashmap[cluster-status] // - node-id: // - list[ip]: // - address: string // - vote: int // - last_ping_at: int64 // - preemption-lock: node-id // - node-status-upgrade-status // // A node will be removed from the cluster if it is no longer active const ( CLUSTER_STATUS_HASH_MAP_KEY = "cluster-nodes-status-hash-map" PREEMPTION_LOCK_KEY = "cluster-master-preemption-lock" ) // try lock the slot to be the master of the cluster // returns: // - bool: true if the slot is locked by the node // - error: error if any func (c *Cluster) lockMaster() (bool, error) { var final_error error for i := 0; i < 3; i++ { if success, err := cache.SetNX(PREEMPTION_LOCK_KEY, c.id, MASTER_LOCK_EXPIRED_TIME); err != nil { // try again if final_error == nil { final_error = err } else { final_error = errors.Join(final_error, err) } } else if !success { return false, nil } else { return true, nil } } return false, final_error } // update master func (c *Cluster) updateMaster() error { // update expired time of master key if _, err := cache.Expire(PREEMPTION_LOCK_KEY, MASTER_LOCK_EXPIRED_TIME); err != nil { return err } return nil }