cluster.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package cluster
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  9. )
  10. type Cluster struct {
  11. // id is the unique id of the cluster
  12. id string
  13. // i_am_master is the flag to indicate whether the current node is the master node
  14. i_am_master bool
  15. // main http port of the current node
  16. port uint16
  17. // plugins stores all the plugin life time of the current node
  18. plugins mapping.Map[string, *pluginLifeTime]
  19. plugin_lock sync.RWMutex
  20. manager *plugin_manager.PluginManager
  21. // nodes stores all the nodes of the cluster
  22. nodes mapping.Map[string, node]
  23. // signals for waiting for the cluster to stop
  24. stop_chan chan bool
  25. stopped int32
  26. is_in_auto_gc_nodes int32
  27. is_in_auto_gc_plugins int32
  28. // channels to notify cluster event
  29. notify_become_master_chan chan bool
  30. notify_master_gc_chan chan bool
  31. notify_master_gc_completed_chan chan bool
  32. notify_voting_chan chan bool
  33. notify_voting_completed_chan chan bool
  34. notify_plugin_schedule_chan chan bool
  35. notify_plugin_schedule_completed_chan chan bool
  36. notify_node_update_chan chan bool
  37. notify_node_update_completed_chan chan bool
  38. notify_cluster_stopped_chan chan bool
  39. }
  40. func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {
  41. return &Cluster{
  42. id: uuid.New().String(),
  43. port: uint16(config.ServerPort),
  44. stop_chan: make(chan bool),
  45. manager: plugin_manager,
  46. notify_become_master_chan: make(chan bool),
  47. notify_master_gc_chan: make(chan bool),
  48. notify_master_gc_completed_chan: make(chan bool),
  49. notify_voting_chan: make(chan bool),
  50. notify_voting_completed_chan: make(chan bool),
  51. notify_plugin_schedule_chan: make(chan bool),
  52. notify_plugin_schedule_completed_chan: make(chan bool),
  53. notify_node_update_chan: make(chan bool),
  54. notify_node_update_completed_chan: make(chan bool),
  55. notify_cluster_stopped_chan: make(chan bool),
  56. }
  57. }
  58. func (c *Cluster) Launch() {
  59. go c.clusterLifetime()
  60. }
  61. func (c *Cluster) Close() error {
  62. if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
  63. close(c.stop_chan)
  64. }
  65. return nil
  66. }
  67. func (c *Cluster) ID() string {
  68. return c.id
  69. }
  70. // trigger for master event
  71. func (c *Cluster) notifyBecomeMaster() {
  72. if atomic.LoadInt32(&c.stopped) == 1 {
  73. return
  74. }
  75. select {
  76. case c.notify_become_master_chan <- true:
  77. default:
  78. }
  79. }
  80. // receive the master event
  81. func (c *Cluster) NotifyBecomeMaster() <-chan bool {
  82. if atomic.LoadInt32(&c.stopped) == 1 {
  83. return nil
  84. }
  85. return c.notify_become_master_chan
  86. }
  87. // trigger for master gc event
  88. func (c *Cluster) notifyMasterGC() {
  89. if atomic.LoadInt32(&c.stopped) == 1 {
  90. return
  91. }
  92. select {
  93. case c.notify_master_gc_chan <- true:
  94. default:
  95. }
  96. }
  97. // trigger for master gc completed event
  98. func (c *Cluster) notifyMasterGCCompleted() {
  99. if atomic.LoadInt32(&c.stopped) == 1 {
  100. return
  101. }
  102. select {
  103. case c.notify_master_gc_completed_chan <- true:
  104. default:
  105. }
  106. }
  107. // trigger for voting event
  108. func (c *Cluster) notifyVoting() {
  109. if atomic.LoadInt32(&c.stopped) == 1 {
  110. return
  111. }
  112. select {
  113. case c.notify_voting_chan <- true:
  114. default:
  115. }
  116. }
  117. // trigger for voting completed event
  118. func (c *Cluster) notifyVotingCompleted() {
  119. if atomic.LoadInt32(&c.stopped) == 1 {
  120. return
  121. }
  122. select {
  123. case c.notify_voting_completed_chan <- true:
  124. default:
  125. }
  126. }
  127. // trigger for plugin schedule event
  128. func (c *Cluster) notifyPluginSchedule() {
  129. if atomic.LoadInt32(&c.stopped) == 1 {
  130. return
  131. }
  132. select {
  133. case c.notify_plugin_schedule_chan <- true:
  134. default:
  135. }
  136. }
  137. // trigger for plugin schedule completed event
  138. func (c *Cluster) notifyPluginScheduleCompleted() {
  139. if atomic.LoadInt32(&c.stopped) == 1 {
  140. return
  141. }
  142. select {
  143. case c.notify_plugin_schedule_completed_chan <- true:
  144. default:
  145. }
  146. }
  147. // trigger for node update event
  148. func (c *Cluster) notifyNodeUpdate() {
  149. if atomic.LoadInt32(&c.stopped) == 1 {
  150. return
  151. }
  152. select {
  153. case c.notify_node_update_chan <- true:
  154. default:
  155. }
  156. }
  157. // trigger for node update completed event
  158. func (c *Cluster) notifyNodeUpdateCompleted() {
  159. if atomic.LoadInt32(&c.stopped) == 1 {
  160. return
  161. }
  162. select {
  163. case c.notify_node_update_completed_chan <- true:
  164. default:
  165. }
  166. }
  167. // trigger for cluster stopped event
  168. func (c *Cluster) notifyClusterStopped() {
  169. select {
  170. case c.notify_cluster_stopped_chan <- true:
  171. default:
  172. }
  173. }
  174. // receive the master gc event
  175. func (c *Cluster) NotifyMasterGC() <-chan bool {
  176. if atomic.LoadInt32(&c.stopped) == 1 {
  177. return nil
  178. }
  179. return c.notify_master_gc_chan
  180. }
  181. // receive the master gc completed event
  182. func (c *Cluster) NotifyMasterGCCompleted() <-chan bool {
  183. if atomic.LoadInt32(&c.stopped) == 1 {
  184. return nil
  185. }
  186. return c.notify_master_gc_completed_chan
  187. }
  188. // receive the voting event
  189. func (c *Cluster) NotifyVoting() <-chan bool {
  190. if atomic.LoadInt32(&c.stopped) == 1 {
  191. return nil
  192. }
  193. return c.notify_voting_chan
  194. }
  195. // receive the voting completed event
  196. func (c *Cluster) NotifyVotingCompleted() <-chan bool {
  197. if atomic.LoadInt32(&c.stopped) == 1 {
  198. return nil
  199. }
  200. return c.notify_voting_completed_chan
  201. }
  202. // receive the plugin schedule event
  203. func (c *Cluster) NotifyPluginSchedule() <-chan bool {
  204. if atomic.LoadInt32(&c.stopped) == 1 {
  205. return nil
  206. }
  207. return c.notify_plugin_schedule_chan
  208. }
  209. // receive the plugin schedule completed event
  210. func (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {
  211. if atomic.LoadInt32(&c.stopped) == 1 {
  212. return nil
  213. }
  214. return c.notify_plugin_schedule_completed_chan
  215. }
  216. // receive the node update event
  217. func (c *Cluster) NotifyNodeUpdate() <-chan bool {
  218. if atomic.LoadInt32(&c.stopped) == 1 {
  219. return nil
  220. }
  221. return c.notify_node_update_chan
  222. }
  223. // receive the node update completed event
  224. func (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {
  225. if atomic.LoadInt32(&c.stopped) == 1 {
  226. return nil
  227. }
  228. return c.notify_node_update_completed_chan
  229. }
  230. // receive the cluster stopped event
  231. func (c *Cluster) NotifyClusterStopped() <-chan bool {
  232. return c.notify_cluster_stopped_chan
  233. }