| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 | package clusterimport (	"errors"	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache")// Plugin daemon will preemptively try to lock the slot to be the master of the cluster// and keep update current status of the whole cluster// once the master is no longer active, one of the slave will try to lock the slot again// and become the new master//// Once a node becomes master, It will take responsibility to gc the nodes has already deactivated// and all nodes should to maintenance their own status//// State://	- hashmap[cluster-status]//		- node_id://			- list[ip]://				- address: string//				- vote[]://					- node_id: string//					- voted_at: int64//					- failed: bool//			- last_ping_at: int64//	- preemption-lock: node_id//const (	CLUSTER_STATUS_HASH_MAP_KEY = "cluster-nodes-status-hash-map"	PREEMPTION_LOCK_KEY         = "cluster-master-preemption-lock")// try lock the slot to be the master of the cluster// returns://   - bool: true if the slot is locked by the node//   - error: error if anyfunc (c *Cluster) lockMaster() (bool, error) {	var final_error error	for i := 0; i < 3; i++ {		if success, err := cache.SetNX(PREEMPTION_LOCK_KEY, c.id, MASTER_LOCK_EXPIRED_TIME); err != nil {			// try again			if final_error == nil {				final_error = err			} else {				final_error = errors.Join(final_error, err)			}		} else if !success {			return false, nil		} else {			return true, nil		}	}	return false, final_error}// update masterfunc (c *Cluster) updateMaster() error {	// update expired time of master key	if _, err := cache.Expire(PREEMPTION_LOCK_KEY, MASTER_LOCK_EXPIRED_TIME); err != nil {		return err	}	return nil}
 |