cluster.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package cluster
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  8. )
  9. type Cluster struct {
  10. // id is the unique id of the cluster
  11. id string
  12. // i_am_master is the flag to indicate whether the current node is the master node
  13. i_am_master bool
  14. // main http port of the current node
  15. port uint16
  16. // plugins stores all the plugin life time of the current node
  17. plugins mapping.Map[string, *pluginLifeTime]
  18. plugin_lock sync.RWMutex
  19. // nodes stores all the nodes of the cluster
  20. nodes mapping.Map[string, node]
  21. // signals for waiting for the cluster to stop
  22. stop_chan chan bool
  23. stopped int32
  24. is_in_auto_gc_nodes int32
  25. is_in_auto_gc_plugins int32
  26. // channels to notify cluster event
  27. notify_become_master_chan chan bool
  28. notify_master_gc_chan chan bool
  29. notify_master_gc_completed_chan chan bool
  30. notify_voting_chan chan bool
  31. notify_voting_completed_chan chan bool
  32. notify_plugin_schedule_chan chan bool
  33. notify_plugin_schedule_completed_chan chan bool
  34. notify_node_update_chan chan bool
  35. notify_node_update_completed_chan chan bool
  36. notify_cluster_stopped_chan chan bool
  37. }
  38. func NewCluster(config *app.Config) *Cluster {
  39. return &Cluster{
  40. id: uuid.New().String(),
  41. port: uint16(config.ServerPort),
  42. stop_chan: make(chan bool),
  43. notify_become_master_chan: make(chan bool),
  44. notify_master_gc_chan: make(chan bool),
  45. notify_master_gc_completed_chan: make(chan bool),
  46. notify_voting_chan: make(chan bool),
  47. notify_voting_completed_chan: make(chan bool),
  48. notify_plugin_schedule_chan: make(chan bool),
  49. notify_plugin_schedule_completed_chan: make(chan bool),
  50. notify_node_update_chan: make(chan bool),
  51. notify_node_update_completed_chan: make(chan bool),
  52. notify_cluster_stopped_chan: make(chan bool),
  53. }
  54. }
  55. func (c *Cluster) Launch() {
  56. go c.clusterLifetime()
  57. }
  58. func (c *Cluster) Close() error {
  59. if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
  60. close(c.stop_chan)
  61. }
  62. return nil
  63. }
  64. func (c *Cluster) ID() string {
  65. return c.id
  66. }
  67. // trigger for master event
  68. func (c *Cluster) notifyBecomeMaster() {
  69. if atomic.LoadInt32(&c.stopped) == 1 {
  70. return
  71. }
  72. select {
  73. case c.notify_become_master_chan <- true:
  74. default:
  75. }
  76. }
  77. // receive the master event
  78. func (c *Cluster) NotifyBecomeMaster() <-chan bool {
  79. if atomic.LoadInt32(&c.stopped) == 1 {
  80. return nil
  81. }
  82. return c.notify_become_master_chan
  83. }
  84. // trigger for master gc event
  85. func (c *Cluster) notifyMasterGC() {
  86. if atomic.LoadInt32(&c.stopped) == 1 {
  87. return
  88. }
  89. select {
  90. case c.notify_master_gc_chan <- true:
  91. default:
  92. }
  93. }
  94. // trigger for master gc completed event
  95. func (c *Cluster) notifyMasterGCCompleted() {
  96. if atomic.LoadInt32(&c.stopped) == 1 {
  97. return
  98. }
  99. select {
  100. case c.notify_master_gc_completed_chan <- true:
  101. default:
  102. }
  103. }
  104. // trigger for voting event
  105. func (c *Cluster) notifyVoting() {
  106. if atomic.LoadInt32(&c.stopped) == 1 {
  107. return
  108. }
  109. select {
  110. case c.notify_voting_chan <- true:
  111. default:
  112. }
  113. }
  114. // trigger for voting completed event
  115. func (c *Cluster) notifyVotingCompleted() {
  116. if atomic.LoadInt32(&c.stopped) == 1 {
  117. return
  118. }
  119. select {
  120. case c.notify_voting_completed_chan <- true:
  121. default:
  122. }
  123. }
  124. // trigger for plugin schedule event
  125. func (c *Cluster) notifyPluginSchedule() {
  126. if atomic.LoadInt32(&c.stopped) == 1 {
  127. return
  128. }
  129. select {
  130. case c.notify_plugin_schedule_chan <- true:
  131. default:
  132. }
  133. }
  134. // trigger for plugin schedule completed event
  135. func (c *Cluster) notifyPluginScheduleCompleted() {
  136. if atomic.LoadInt32(&c.stopped) == 1 {
  137. return
  138. }
  139. select {
  140. case c.notify_plugin_schedule_completed_chan <- true:
  141. default:
  142. }
  143. }
  144. // trigger for node update event
  145. func (c *Cluster) notifyNodeUpdate() {
  146. if atomic.LoadInt32(&c.stopped) == 1 {
  147. return
  148. }
  149. select {
  150. case c.notify_node_update_chan <- true:
  151. default:
  152. }
  153. }
  154. // trigger for node update completed event
  155. func (c *Cluster) notifyNodeUpdateCompleted() {
  156. if atomic.LoadInt32(&c.stopped) == 1 {
  157. return
  158. }
  159. select {
  160. case c.notify_node_update_completed_chan <- true:
  161. default:
  162. }
  163. }
  164. // trigger for cluster stopped event
  165. func (c *Cluster) notifyClusterStopped() {
  166. select {
  167. case c.notify_cluster_stopped_chan <- true:
  168. default:
  169. }
  170. }
  171. // receive the master gc event
  172. func (c *Cluster) NotifyMasterGC() <-chan bool {
  173. if atomic.LoadInt32(&c.stopped) == 1 {
  174. return nil
  175. }
  176. return c.notify_master_gc_chan
  177. }
  178. // receive the master gc completed event
  179. func (c *Cluster) NotifyMasterGCCompleted() <-chan bool {
  180. if atomic.LoadInt32(&c.stopped) == 1 {
  181. return nil
  182. }
  183. return c.notify_master_gc_completed_chan
  184. }
  185. // receive the voting event
  186. func (c *Cluster) NotifyVoting() <-chan bool {
  187. if atomic.LoadInt32(&c.stopped) == 1 {
  188. return nil
  189. }
  190. return c.notify_voting_chan
  191. }
  192. // receive the voting completed event
  193. func (c *Cluster) NotifyVotingCompleted() <-chan bool {
  194. if atomic.LoadInt32(&c.stopped) == 1 {
  195. return nil
  196. }
  197. return c.notify_voting_completed_chan
  198. }
  199. // receive the plugin schedule event
  200. func (c *Cluster) NotifyPluginSchedule() <-chan bool {
  201. if atomic.LoadInt32(&c.stopped) == 1 {
  202. return nil
  203. }
  204. return c.notify_plugin_schedule_chan
  205. }
  206. // receive the plugin schedule completed event
  207. func (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {
  208. if atomic.LoadInt32(&c.stopped) == 1 {
  209. return nil
  210. }
  211. return c.notify_plugin_schedule_completed_chan
  212. }
  213. // receive the node update event
  214. func (c *Cluster) NotifyNodeUpdate() <-chan bool {
  215. if atomic.LoadInt32(&c.stopped) == 1 {
  216. return nil
  217. }
  218. return c.notify_node_update_chan
  219. }
  220. // receive the node update completed event
  221. func (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {
  222. if atomic.LoadInt32(&c.stopped) == 1 {
  223. return nil
  224. }
  225. return c.notify_node_update_completed_chan
  226. }
  227. // receive the cluster stopped event
  228. func (c *Cluster) NotifyClusterStopped() <-chan bool {
  229. return c.notify_cluster_stopped_chan
  230. }