cluster.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package cluster
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. "github.com/google/uuid"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
  10. )
  11. type Cluster struct {
  12. // id is the unique id of the cluster
  13. id string
  14. // i_am_master is the flag to indicate whether the current node is the master node
  15. iAmMaster bool
  16. // main http port of the current node
  17. port uint16
  18. // plugins stores all the plugin life time of the current node
  19. plugins mapping.Map[string, *pluginLifeTime]
  20. pluginLock sync.RWMutex
  21. manager *plugin_manager.PluginManager
  22. // nodes stores all the nodes of the cluster
  23. nodes mapping.Map[string, node]
  24. // signals for waiting for the cluster to stop
  25. stopChan chan bool
  26. stopped int32
  27. isInAutoGcNodes int32
  28. isInAutoGcPlugins int32
  29. // channels to notify cluster event
  30. notifyBecomeMasterChan chan bool
  31. notifyMasterGcChan chan bool
  32. notifyMasterGcCompletedChan chan bool
  33. notifyVotingChan chan bool
  34. notifyVotingCompletedChan chan bool
  35. notifyPluginScheduleChan chan bool
  36. notifyPluginScheduleCompletedChan chan bool
  37. notifyNodeUpdateChan chan bool
  38. notifyNodeUpdateCompletedChan chan bool
  39. notifyClusterStoppedChan chan bool
  40. showLog bool
  41. masterGcInterval time.Duration
  42. masterLockingInterval time.Duration
  43. masterLockExpiredTime time.Duration
  44. nodeVoteInterval time.Duration
  45. nodeDisconnectedTimeout time.Duration
  46. updateNodeStatusInterval time.Duration
  47. pluginSchedulerInterval time.Duration
  48. pluginSchedulerTickerInterval time.Duration
  49. pluginDeactivatedTimeout time.Duration
  50. }
  51. func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {
  52. return &Cluster{
  53. id: uuid.New().String(),
  54. port: uint16(config.ServerPort),
  55. stopChan: make(chan bool),
  56. showLog: config.DisplayClusterLog,
  57. masterGcInterval: MASTER_GC_INTERVAL,
  58. masterLockingInterval: MASTER_LOCKING_INTERVAL,
  59. masterLockExpiredTime: MASTER_LOCK_EXPIRED_TIME,
  60. nodeVoteInterval: NODE_VOTE_INTERVAL,
  61. nodeDisconnectedTimeout: NODE_DISCONNECTED_TIMEOUT,
  62. updateNodeStatusInterval: UPDATE_NODE_STATUS_INTERVAL,
  63. pluginSchedulerInterval: PLUGIN_SCHEDULER_INTERVAL,
  64. pluginSchedulerTickerInterval: PLUGIN_SCHEDULER_TICKER_INTERVAL,
  65. pluginDeactivatedTimeout: PLUGIN_DEACTIVATED_TIMEOUT,
  66. manager: plugin_manager,
  67. notifyBecomeMasterChan: make(chan bool),
  68. notifyMasterGcChan: make(chan bool),
  69. notifyMasterGcCompletedChan: make(chan bool),
  70. notifyVotingChan: make(chan bool),
  71. notifyVotingCompletedChan: make(chan bool),
  72. notifyPluginScheduleChan: make(chan bool),
  73. notifyPluginScheduleCompletedChan: make(chan bool),
  74. notifyNodeUpdateChan: make(chan bool),
  75. notifyNodeUpdateCompletedChan: make(chan bool),
  76. notifyClusterStoppedChan: make(chan bool),
  77. }
  78. }
  79. func (c *Cluster) Launch() {
  80. go c.clusterLifetime()
  81. }
  82. func (c *Cluster) Close() error {
  83. if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
  84. close(c.stopChan)
  85. }
  86. return nil
  87. }
  88. func (c *Cluster) ID() string {
  89. return c.id
  90. }
  91. // trigger for master event
  92. func (c *Cluster) notifyBecomeMaster() {
  93. if atomic.LoadInt32(&c.stopped) == 1 {
  94. return
  95. }
  96. select {
  97. case c.notifyBecomeMasterChan <- true:
  98. default:
  99. }
  100. }
  101. // receive the master event
  102. func (c *Cluster) NotifyBecomeMaster() <-chan bool {
  103. if atomic.LoadInt32(&c.stopped) == 1 {
  104. return nil
  105. }
  106. return c.notifyBecomeMasterChan
  107. }
  108. // trigger for master gc event
  109. func (c *Cluster) notifyMasterGC() {
  110. if atomic.LoadInt32(&c.stopped) == 1 {
  111. return
  112. }
  113. select {
  114. case c.notifyMasterGcChan <- true:
  115. default:
  116. }
  117. }
  118. // trigger for master gc completed event
  119. func (c *Cluster) notifyMasterGCCompleted() {
  120. if atomic.LoadInt32(&c.stopped) == 1 {
  121. return
  122. }
  123. select {
  124. case c.notifyMasterGcCompletedChan <- true:
  125. default:
  126. }
  127. }
  128. // trigger for voting event
  129. func (c *Cluster) notifyVoting() {
  130. if atomic.LoadInt32(&c.stopped) == 1 {
  131. return
  132. }
  133. select {
  134. case c.notifyVotingChan <- true:
  135. default:
  136. }
  137. }
  138. // trigger for voting completed event
  139. func (c *Cluster) notifyVotingCompleted() {
  140. if atomic.LoadInt32(&c.stopped) == 1 {
  141. return
  142. }
  143. select {
  144. case c.notifyVotingCompletedChan <- true:
  145. default:
  146. }
  147. }
  148. // trigger for plugin schedule event
  149. func (c *Cluster) notifyPluginSchedule() {
  150. if atomic.LoadInt32(&c.stopped) == 1 {
  151. return
  152. }
  153. select {
  154. case c.notifyPluginScheduleChan <- true:
  155. default:
  156. }
  157. }
  158. // trigger for plugin schedule completed event
  159. func (c *Cluster) notifyPluginScheduleCompleted() {
  160. if atomic.LoadInt32(&c.stopped) == 1 {
  161. return
  162. }
  163. select {
  164. case c.notifyPluginScheduleCompletedChan <- true:
  165. default:
  166. }
  167. }
  168. // trigger for node update event
  169. func (c *Cluster) notifyNodeUpdate() {
  170. if atomic.LoadInt32(&c.stopped) == 1 {
  171. return
  172. }
  173. select {
  174. case c.notifyNodeUpdateChan <- true:
  175. default:
  176. }
  177. }
  178. // trigger for node update completed event
  179. func (c *Cluster) notifyNodeUpdateCompleted() {
  180. if atomic.LoadInt32(&c.stopped) == 1 {
  181. return
  182. }
  183. select {
  184. case c.notifyNodeUpdateCompletedChan <- true:
  185. default:
  186. }
  187. }
  188. // trigger for cluster stopped event
  189. func (c *Cluster) notifyClusterStopped() {
  190. select {
  191. case c.notifyClusterStoppedChan <- true:
  192. default:
  193. }
  194. }
  195. // receive the master gc event
  196. func (c *Cluster) NotifyMasterGC() <-chan bool {
  197. if atomic.LoadInt32(&c.stopped) == 1 {
  198. return nil
  199. }
  200. return c.notifyMasterGcChan
  201. }
  202. // receive the master gc completed event
  203. func (c *Cluster) NotifyMasterGCCompleted() <-chan bool {
  204. if atomic.LoadInt32(&c.stopped) == 1 {
  205. return nil
  206. }
  207. return c.notifyMasterGcCompletedChan
  208. }
  209. // receive the voting event
  210. func (c *Cluster) NotifyVoting() <-chan bool {
  211. if atomic.LoadInt32(&c.stopped) == 1 {
  212. return nil
  213. }
  214. return c.notifyVotingChan
  215. }
  216. // receive the voting completed event
  217. func (c *Cluster) NotifyVotingCompleted() <-chan bool {
  218. if atomic.LoadInt32(&c.stopped) == 1 {
  219. return nil
  220. }
  221. return c.notifyVotingCompletedChan
  222. }
  223. // receive the plugin schedule event
  224. func (c *Cluster) NotifyPluginSchedule() <-chan bool {
  225. if atomic.LoadInt32(&c.stopped) == 1 {
  226. return nil
  227. }
  228. return c.notifyPluginScheduleChan
  229. }
  230. // receive the plugin schedule completed event
  231. func (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {
  232. if atomic.LoadInt32(&c.stopped) == 1 {
  233. return nil
  234. }
  235. return c.notifyPluginScheduleCompletedChan
  236. }
  237. // receive the node update event
  238. func (c *Cluster) NotifyNodeUpdate() <-chan bool {
  239. if atomic.LoadInt32(&c.stopped) == 1 {
  240. return nil
  241. }
  242. return c.notifyNodeUpdateChan
  243. }
  244. // receive the node update completed event
  245. func (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {
  246. if atomic.LoadInt32(&c.stopped) == 1 {
  247. return nil
  248. }
  249. return c.notifyNodeUpdateCompletedChan
  250. }
  251. // receive the cluster stopped event
  252. func (c *Cluster) NotifyClusterStopped() <-chan bool {
  253. return c.notifyClusterStoppedChan
  254. }