preemptive.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package cluster
  2. import (
  3. "errors"
  4. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  5. )
  6. // Plugin daemon will preemptively try to lock the slot to be the master of the cluster
  7. // and keep update current status of the whole cluster
  8. // once the master is no longer active, one of the slave will try to lock the slot again
  9. // and become the new master
  10. //
  11. // Once a node becomes master, It will take responsibility to gc the nodes has already deactivated
  12. // and all nodes should to maintenance their own status
  13. //
  14. // State:
  15. // - hashmap[cluster-status]
  16. // - node_id:
  17. // - list[ip]:
  18. // - address: string
  19. // - vote[]:
  20. // - node_id: string
  21. // - voted_at: int64
  22. // - failed: bool
  23. // - last_ping_at: int64
  24. // - preemption-lock: node_id
  25. //
  26. const (
  27. CLUSTER_STATUS_HASH_MAP_KEY = "cluster-nodes-status-hash-map"
  28. PREEMPTION_LOCK_KEY = "cluster-master-preemption-lock"
  29. )
  30. // try lock the slot to be the master of the cluster
  31. // returns:
  32. // - bool: true if the slot is locked by the node
  33. // - error: error if any
  34. func (c *Cluster) lockMaster() (bool, error) {
  35. var finalError error
  36. for i := 0; i < 3; i++ {
  37. if success, err := cache.SetNX(PREEMPTION_LOCK_KEY, c.id, c.masterLockExpiredTime); err != nil {
  38. // try again
  39. if finalError == nil {
  40. finalError = err
  41. } else {
  42. finalError = errors.Join(finalError, err)
  43. }
  44. } else if !success {
  45. return false, nil
  46. } else {
  47. return true, nil
  48. }
  49. }
  50. return false, finalError
  51. }
  52. // update master
  53. func (c *Cluster) updateMaster() error {
  54. // update expired time of master key
  55. if _, err := cache.Expire(PREEMPTION_LOCK_KEY, c.masterLockExpiredTime); err != nil {
  56. return err
  57. }
  58. return nil
  59. }