cluster.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package cluster
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  9. )
  10. type Cluster struct {
  11. // id is the unique id of the cluster
  12. id string
  13. // i_am_master is the flag to indicate whether the current node is the master node
  14. iAmMaster bool
  15. // main http port of the current node
  16. port uint16
  17. // plugins stores all the plugin life time of the current node
  18. plugins mapping.Map[string, *pluginLifeTime]
  19. pluginLock sync.RWMutex
  20. manager *plugin_manager.PluginManager
  21. // nodes stores all the nodes of the cluster
  22. nodes mapping.Map[string, node]
  23. // signals for waiting for the cluster to stop
  24. stopChan chan bool
  25. stopped int32
  26. isInAutoGcNodes int32
  27. isInAutoGcPlugins int32
  28. // channels to notify cluster event
  29. notifyBecomeMasterChan chan bool
  30. notifyMasterGcChan chan bool
  31. notifyMasterGcCompletedChan chan bool
  32. notifyVotingChan chan bool
  33. notifyVotingCompletedChan chan bool
  34. notifyPluginScheduleChan chan bool
  35. notifyPluginScheduleCompletedChan chan bool
  36. notifyNodeUpdateChan chan bool
  37. notifyNodeUpdateCompletedChan chan bool
  38. notifyClusterStoppedChan chan bool
  39. }
  40. func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {
  41. return &Cluster{
  42. id: uuid.New().String(),
  43. port: uint16(config.ServerPort),
  44. stopChan: make(chan bool),
  45. manager: plugin_manager,
  46. notifyBecomeMasterChan: make(chan bool),
  47. notifyMasterGcChan: make(chan bool),
  48. notifyMasterGcCompletedChan: make(chan bool),
  49. notifyVotingChan: make(chan bool),
  50. notifyVotingCompletedChan: make(chan bool),
  51. notifyPluginScheduleChan: make(chan bool),
  52. notifyPluginScheduleCompletedChan: make(chan bool),
  53. notifyNodeUpdateChan: make(chan bool),
  54. notifyNodeUpdateCompletedChan: make(chan bool),
  55. notifyClusterStoppedChan: make(chan bool),
  56. }
  57. }
  58. func (c *Cluster) Launch() {
  59. go c.clusterLifetime()
  60. }
  61. func (c *Cluster) Close() error {
  62. if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
  63. close(c.stopChan)
  64. }
  65. return nil
  66. }
  67. func (c *Cluster) ID() string {
  68. return c.id
  69. }
  70. // trigger for master event
  71. func (c *Cluster) notifyBecomeMaster() {
  72. if atomic.LoadInt32(&c.stopped) == 1 {
  73. return
  74. }
  75. select {
  76. case c.notifyBecomeMasterChan <- true:
  77. default:
  78. }
  79. }
  80. // receive the master event
  81. func (c *Cluster) NotifyBecomeMaster() <-chan bool {
  82. if atomic.LoadInt32(&c.stopped) == 1 {
  83. return nil
  84. }
  85. return c.notifyBecomeMasterChan
  86. }
  87. // trigger for master gc event
  88. func (c *Cluster) notifyMasterGC() {
  89. if atomic.LoadInt32(&c.stopped) == 1 {
  90. return
  91. }
  92. select {
  93. case c.notifyMasterGcChan <- true:
  94. default:
  95. }
  96. }
  97. // trigger for master gc completed event
  98. func (c *Cluster) notifyMasterGCCompleted() {
  99. if atomic.LoadInt32(&c.stopped) == 1 {
  100. return
  101. }
  102. select {
  103. case c.notifyMasterGcCompletedChan <- true:
  104. default:
  105. }
  106. }
  107. // trigger for voting event
  108. func (c *Cluster) notifyVoting() {
  109. if atomic.LoadInt32(&c.stopped) == 1 {
  110. return
  111. }
  112. select {
  113. case c.notifyVotingChan <- true:
  114. default:
  115. }
  116. }
  117. // trigger for voting completed event
  118. func (c *Cluster) notifyVotingCompleted() {
  119. if atomic.LoadInt32(&c.stopped) == 1 {
  120. return
  121. }
  122. select {
  123. case c.notifyVotingCompletedChan <- true:
  124. default:
  125. }
  126. }
  127. // trigger for plugin schedule event
  128. func (c *Cluster) notifyPluginSchedule() {
  129. if atomic.LoadInt32(&c.stopped) == 1 {
  130. return
  131. }
  132. select {
  133. case c.notifyPluginScheduleChan <- true:
  134. default:
  135. }
  136. }
  137. // trigger for plugin schedule completed event
  138. func (c *Cluster) notifyPluginScheduleCompleted() {
  139. if atomic.LoadInt32(&c.stopped) == 1 {
  140. return
  141. }
  142. select {
  143. case c.notifyPluginScheduleCompletedChan <- true:
  144. default:
  145. }
  146. }
  147. // trigger for node update event
  148. func (c *Cluster) notifyNodeUpdate() {
  149. if atomic.LoadInt32(&c.stopped) == 1 {
  150. return
  151. }
  152. select {
  153. case c.notifyNodeUpdateChan <- true:
  154. default:
  155. }
  156. }
  157. // trigger for node update completed event
  158. func (c *Cluster) notifyNodeUpdateCompleted() {
  159. if atomic.LoadInt32(&c.stopped) == 1 {
  160. return
  161. }
  162. select {
  163. case c.notifyNodeUpdateCompletedChan <- true:
  164. default:
  165. }
  166. }
  167. // trigger for cluster stopped event
  168. func (c *Cluster) notifyClusterStopped() {
  169. select {
  170. case c.notifyClusterStoppedChan <- true:
  171. default:
  172. }
  173. }
  174. // receive the master gc event
  175. func (c *Cluster) NotifyMasterGC() <-chan bool {
  176. if atomic.LoadInt32(&c.stopped) == 1 {
  177. return nil
  178. }
  179. return c.notifyMasterGcChan
  180. }
  181. // receive the master gc completed event
  182. func (c *Cluster) NotifyMasterGCCompleted() <-chan bool {
  183. if atomic.LoadInt32(&c.stopped) == 1 {
  184. return nil
  185. }
  186. return c.notifyMasterGcCompletedChan
  187. }
  188. // receive the voting event
  189. func (c *Cluster) NotifyVoting() <-chan bool {
  190. if atomic.LoadInt32(&c.stopped) == 1 {
  191. return nil
  192. }
  193. return c.notifyVotingChan
  194. }
  195. // receive the voting completed event
  196. func (c *Cluster) NotifyVotingCompleted() <-chan bool {
  197. if atomic.LoadInt32(&c.stopped) == 1 {
  198. return nil
  199. }
  200. return c.notifyVotingCompletedChan
  201. }
  202. // receive the plugin schedule event
  203. func (c *Cluster) NotifyPluginSchedule() <-chan bool {
  204. if atomic.LoadInt32(&c.stopped) == 1 {
  205. return nil
  206. }
  207. return c.notifyPluginScheduleChan
  208. }
  209. // receive the plugin schedule completed event
  210. func (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {
  211. if atomic.LoadInt32(&c.stopped) == 1 {
  212. return nil
  213. }
  214. return c.notifyPluginScheduleCompletedChan
  215. }
  216. // receive the node update event
  217. func (c *Cluster) NotifyNodeUpdate() <-chan bool {
  218. if atomic.LoadInt32(&c.stopped) == 1 {
  219. return nil
  220. }
  221. return c.notifyNodeUpdateChan
  222. }
  223. // receive the node update completed event
  224. func (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {
  225. if atomic.LoadInt32(&c.stopped) == 1 {
  226. return nil
  227. }
  228. return c.notifyNodeUpdateCompletedChan
  229. }
  230. // receive the cluster stopped event
  231. func (c *Cluster) NotifyClusterStopped() <-chan bool {
  232. return c.notifyClusterStoppedChan
  233. }