cluster.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package cluster
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  9. )
  10. type Cluster struct {
  11. // id is the unique id of the cluster
  12. id string
  13. // i_am_master is the flag to indicate whether the current node is the master node
  14. iAmMaster bool
  15. // main http port of the current node
  16. port uint16
  17. // plugins stores all the plugin life time of the current node
  18. plugins mapping.Map[string, *pluginLifeTime]
  19. pluginLock sync.RWMutex
  20. manager *plugin_manager.PluginManager
  21. // nodes stores all the nodes of the cluster
  22. nodes mapping.Map[string, node]
  23. // signals for waiting for the cluster to stop
  24. stopChan chan bool
  25. stopped int32
  26. isInAutoGcNodes int32
  27. isInAutoGcPlugins int32
  28. // channels to notify cluster event
  29. notifyBecomeMasterChan chan bool
  30. notifyMasterGcChan chan bool
  31. notifyMasterGcCompletedChan chan bool
  32. notifyVotingChan chan bool
  33. notifyVotingCompletedChan chan bool
  34. notifyPluginScheduleChan chan bool
  35. notifyPluginScheduleCompletedChan chan bool
  36. notifyNodeUpdateChan chan bool
  37. notifyNodeUpdateCompletedChan chan bool
  38. notifyClusterStoppedChan chan bool
  39. showLog bool
  40. }
  41. func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {
  42. return &Cluster{
  43. id: uuid.New().String(),
  44. port: uint16(config.ServerPort),
  45. stopChan: make(chan bool),
  46. showLog: config.DisplayClusterLog,
  47. manager: plugin_manager,
  48. notifyBecomeMasterChan: make(chan bool),
  49. notifyMasterGcChan: make(chan bool),
  50. notifyMasterGcCompletedChan: make(chan bool),
  51. notifyVotingChan: make(chan bool),
  52. notifyVotingCompletedChan: make(chan bool),
  53. notifyPluginScheduleChan: make(chan bool),
  54. notifyPluginScheduleCompletedChan: make(chan bool),
  55. notifyNodeUpdateChan: make(chan bool),
  56. notifyNodeUpdateCompletedChan: make(chan bool),
  57. notifyClusterStoppedChan: make(chan bool),
  58. }
  59. }
  60. func (c *Cluster) Launch() {
  61. go c.clusterLifetime()
  62. }
  63. func (c *Cluster) Close() error {
  64. if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
  65. close(c.stopChan)
  66. }
  67. return nil
  68. }
  69. func (c *Cluster) ID() string {
  70. return c.id
  71. }
  72. // trigger for master event
  73. func (c *Cluster) notifyBecomeMaster() {
  74. if atomic.LoadInt32(&c.stopped) == 1 {
  75. return
  76. }
  77. select {
  78. case c.notifyBecomeMasterChan <- true:
  79. default:
  80. }
  81. }
  82. // receive the master event
  83. func (c *Cluster) NotifyBecomeMaster() <-chan bool {
  84. if atomic.LoadInt32(&c.stopped) == 1 {
  85. return nil
  86. }
  87. return c.notifyBecomeMasterChan
  88. }
  89. // trigger for master gc event
  90. func (c *Cluster) notifyMasterGC() {
  91. if atomic.LoadInt32(&c.stopped) == 1 {
  92. return
  93. }
  94. select {
  95. case c.notifyMasterGcChan <- true:
  96. default:
  97. }
  98. }
  99. // trigger for master gc completed event
  100. func (c *Cluster) notifyMasterGCCompleted() {
  101. if atomic.LoadInt32(&c.stopped) == 1 {
  102. return
  103. }
  104. select {
  105. case c.notifyMasterGcCompletedChan <- true:
  106. default:
  107. }
  108. }
  109. // trigger for voting event
  110. func (c *Cluster) notifyVoting() {
  111. if atomic.LoadInt32(&c.stopped) == 1 {
  112. return
  113. }
  114. select {
  115. case c.notifyVotingChan <- true:
  116. default:
  117. }
  118. }
  119. // trigger for voting completed event
  120. func (c *Cluster) notifyVotingCompleted() {
  121. if atomic.LoadInt32(&c.stopped) == 1 {
  122. return
  123. }
  124. select {
  125. case c.notifyVotingCompletedChan <- true:
  126. default:
  127. }
  128. }
  129. // trigger for plugin schedule event
  130. func (c *Cluster) notifyPluginSchedule() {
  131. if atomic.LoadInt32(&c.stopped) == 1 {
  132. return
  133. }
  134. select {
  135. case c.notifyPluginScheduleChan <- true:
  136. default:
  137. }
  138. }
  139. // trigger for plugin schedule completed event
  140. func (c *Cluster) notifyPluginScheduleCompleted() {
  141. if atomic.LoadInt32(&c.stopped) == 1 {
  142. return
  143. }
  144. select {
  145. case c.notifyPluginScheduleCompletedChan <- true:
  146. default:
  147. }
  148. }
  149. // trigger for node update event
  150. func (c *Cluster) notifyNodeUpdate() {
  151. if atomic.LoadInt32(&c.stopped) == 1 {
  152. return
  153. }
  154. select {
  155. case c.notifyNodeUpdateChan <- true:
  156. default:
  157. }
  158. }
  159. // trigger for node update completed event
  160. func (c *Cluster) notifyNodeUpdateCompleted() {
  161. if atomic.LoadInt32(&c.stopped) == 1 {
  162. return
  163. }
  164. select {
  165. case c.notifyNodeUpdateCompletedChan <- true:
  166. default:
  167. }
  168. }
  169. // trigger for cluster stopped event
  170. func (c *Cluster) notifyClusterStopped() {
  171. select {
  172. case c.notifyClusterStoppedChan <- true:
  173. default:
  174. }
  175. }
  176. // receive the master gc event
  177. func (c *Cluster) NotifyMasterGC() <-chan bool {
  178. if atomic.LoadInt32(&c.stopped) == 1 {
  179. return nil
  180. }
  181. return c.notifyMasterGcChan
  182. }
  183. // receive the master gc completed event
  184. func (c *Cluster) NotifyMasterGCCompleted() <-chan bool {
  185. if atomic.LoadInt32(&c.stopped) == 1 {
  186. return nil
  187. }
  188. return c.notifyMasterGcCompletedChan
  189. }
  190. // receive the voting event
  191. func (c *Cluster) NotifyVoting() <-chan bool {
  192. if atomic.LoadInt32(&c.stopped) == 1 {
  193. return nil
  194. }
  195. return c.notifyVotingChan
  196. }
  197. // receive the voting completed event
  198. func (c *Cluster) NotifyVotingCompleted() <-chan bool {
  199. if atomic.LoadInt32(&c.stopped) == 1 {
  200. return nil
  201. }
  202. return c.notifyVotingCompletedChan
  203. }
  204. // receive the plugin schedule event
  205. func (c *Cluster) NotifyPluginSchedule() <-chan bool {
  206. if atomic.LoadInt32(&c.stopped) == 1 {
  207. return nil
  208. }
  209. return c.notifyPluginScheduleChan
  210. }
  211. // receive the plugin schedule completed event
  212. func (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {
  213. if atomic.LoadInt32(&c.stopped) == 1 {
  214. return nil
  215. }
  216. return c.notifyPluginScheduleCompletedChan
  217. }
  218. // receive the node update event
  219. func (c *Cluster) NotifyNodeUpdate() <-chan bool {
  220. if atomic.LoadInt32(&c.stopped) == 1 {
  221. return nil
  222. }
  223. return c.notifyNodeUpdateChan
  224. }
  225. // receive the node update completed event
  226. func (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {
  227. if atomic.LoadInt32(&c.stopped) == 1 {
  228. return nil
  229. }
  230. return c.notifyNodeUpdateCompletedChan
  231. }
  232. // receive the cluster stopped event
  233. func (c *Cluster) NotifyClusterStopped() <-chan bool {
  234. return c.notifyClusterStoppedChan
  235. }