cluster.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package cluster
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  8. )
  9. type Cluster struct {
  10. // id is the unique id of the cluster
  11. id string
  12. // i_am_master is the flag to indicate whether the current node is the master node
  13. i_am_master bool
  14. // port is the health check port of the cluster
  15. port uint16
  16. // plugins stores all the plugin life time of the current node
  17. plugins mapping.Map[string, *pluginLifeTime]
  18. plugin_lock sync.RWMutex
  19. // nodes stores all the nodes of the cluster
  20. nodes mapping.Map[string, node]
  21. // signals for waiting for the cluster to stop
  22. stop_chan chan bool
  23. stopped int32
  24. is_in_auto_gc_nodes int32
  25. is_in_auto_gc_plugins int32
  26. // channels to notify cluster event
  27. notify_become_master_chan chan bool
  28. notify_master_gc_chan chan bool
  29. notify_master_gc_completed_chan chan bool
  30. notify_voting_chan chan bool
  31. notify_voting_completed_chan chan bool
  32. notify_plugin_schedule_chan chan bool
  33. notify_plugin_schedule_completed_chan chan bool
  34. notify_node_update_chan chan bool
  35. notify_node_update_completed_chan chan bool
  36. notify_cluster_stopped_chan chan bool
  37. }
  38. func NewCluster(config *app.Config) *Cluster {
  39. return &Cluster{
  40. id: uuid.New().String(),
  41. port: uint16(config.ServerPort),
  42. stop_chan: make(chan bool),
  43. notify_become_master_chan: make(chan bool),
  44. notify_master_gc_chan: make(chan bool),
  45. notify_master_gc_completed_chan: make(chan bool),
  46. notify_voting_chan: make(chan bool),
  47. notify_voting_completed_chan: make(chan bool),
  48. notify_plugin_schedule_chan: make(chan bool),
  49. notify_plugin_schedule_completed_chan: make(chan bool),
  50. notify_node_update_chan: make(chan bool),
  51. notify_node_update_completed_chan: make(chan bool),
  52. notify_cluster_stopped_chan: make(chan bool),
  53. }
  54. }
  55. func (c *Cluster) Launch() {
  56. go c.clusterLifetime()
  57. }
  58. func (c *Cluster) Close() error {
  59. if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
  60. close(c.stop_chan)
  61. }
  62. return nil
  63. }
  64. // trigger for master event
  65. func (c *Cluster) notifyBecomeMaster() {
  66. if atomic.LoadInt32(&c.stopped) == 1 {
  67. return
  68. }
  69. select {
  70. case c.notify_become_master_chan <- true:
  71. default:
  72. }
  73. }
  74. // receive the master event
  75. func (c *Cluster) NotifyBecomeMaster() <-chan bool {
  76. if atomic.LoadInt32(&c.stopped) == 1 {
  77. return nil
  78. }
  79. return c.notify_become_master_chan
  80. }
  81. // trigger for master gc event
  82. func (c *Cluster) notifyMasterGC() {
  83. if atomic.LoadInt32(&c.stopped) == 1 {
  84. return
  85. }
  86. select {
  87. case c.notify_master_gc_chan <- true:
  88. default:
  89. }
  90. }
  91. // trigger for master gc completed event
  92. func (c *Cluster) notifyMasterGCCompleted() {
  93. if atomic.LoadInt32(&c.stopped) == 1 {
  94. return
  95. }
  96. select {
  97. case c.notify_master_gc_completed_chan <- true:
  98. default:
  99. }
  100. }
  101. // trigger for voting event
  102. func (c *Cluster) notifyVoting() {
  103. if atomic.LoadInt32(&c.stopped) == 1 {
  104. return
  105. }
  106. select {
  107. case c.notify_voting_chan <- true:
  108. default:
  109. }
  110. }
  111. // trigger for voting completed event
  112. func (c *Cluster) notifyVotingCompleted() {
  113. if atomic.LoadInt32(&c.stopped) == 1 {
  114. return
  115. }
  116. select {
  117. case c.notify_voting_completed_chan <- true:
  118. default:
  119. }
  120. }
  121. // trigger for plugin schedule event
  122. func (c *Cluster) notifyPluginSchedule() {
  123. if atomic.LoadInt32(&c.stopped) == 1 {
  124. return
  125. }
  126. select {
  127. case c.notify_plugin_schedule_chan <- true:
  128. default:
  129. }
  130. }
  131. // trigger for plugin schedule completed event
  132. func (c *Cluster) notifyPluginScheduleCompleted() {
  133. if atomic.LoadInt32(&c.stopped) == 1 {
  134. return
  135. }
  136. select {
  137. case c.notify_plugin_schedule_completed_chan <- true:
  138. default:
  139. }
  140. }
  141. // trigger for node update event
  142. func (c *Cluster) notifyNodeUpdate() {
  143. if atomic.LoadInt32(&c.stopped) == 1 {
  144. return
  145. }
  146. select {
  147. case c.notify_node_update_chan <- true:
  148. default:
  149. }
  150. }
  151. // trigger for node update completed event
  152. func (c *Cluster) notifyNodeUpdateCompleted() {
  153. if atomic.LoadInt32(&c.stopped) == 1 {
  154. return
  155. }
  156. select {
  157. case c.notify_node_update_completed_chan <- true:
  158. default:
  159. }
  160. }
  161. // trigger for cluster stopped event
  162. func (c *Cluster) notifyClusterStopped() {
  163. select {
  164. case c.notify_cluster_stopped_chan <- true:
  165. default:
  166. }
  167. }
  168. // receive the master gc event
  169. func (c *Cluster) NotifyMasterGC() <-chan bool {
  170. if atomic.LoadInt32(&c.stopped) == 1 {
  171. return nil
  172. }
  173. return c.notify_master_gc_chan
  174. }
  175. // receive the master gc completed event
  176. func (c *Cluster) NotifyMasterGCCompleted() <-chan bool {
  177. if atomic.LoadInt32(&c.stopped) == 1 {
  178. return nil
  179. }
  180. return c.notify_master_gc_completed_chan
  181. }
  182. // receive the voting event
  183. func (c *Cluster) NotifyVoting() <-chan bool {
  184. if atomic.LoadInt32(&c.stopped) == 1 {
  185. return nil
  186. }
  187. return c.notify_voting_chan
  188. }
  189. // receive the voting completed event
  190. func (c *Cluster) NotifyVotingCompleted() <-chan bool {
  191. if atomic.LoadInt32(&c.stopped) == 1 {
  192. return nil
  193. }
  194. return c.notify_voting_completed_chan
  195. }
  196. // receive the plugin schedule event
  197. func (c *Cluster) NotifyPluginSchedule() <-chan bool {
  198. if atomic.LoadInt32(&c.stopped) == 1 {
  199. return nil
  200. }
  201. return c.notify_plugin_schedule_chan
  202. }
  203. // receive the plugin schedule completed event
  204. func (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {
  205. if atomic.LoadInt32(&c.stopped) == 1 {
  206. return nil
  207. }
  208. return c.notify_plugin_schedule_completed_chan
  209. }
  210. // receive the node update event
  211. func (c *Cluster) NotifyNodeUpdate() <-chan bool {
  212. if atomic.LoadInt32(&c.stopped) == 1 {
  213. return nil
  214. }
  215. return c.notify_node_update_chan
  216. }
  217. // receive the node update completed event
  218. func (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {
  219. if atomic.LoadInt32(&c.stopped) == 1 {
  220. return nil
  221. }
  222. return c.notify_node_update_completed_chan
  223. }
  224. // receive the cluster stopped event
  225. func (c *Cluster) NotifyClusterStopped() <-chan bool {
  226. return c.notify_cluster_stopped_chan
  227. }