state.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package cluster
  2. import (
  3. "sync/atomic"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. )
  9. // RegisterPlugin registers a plugin to the cluster, and start to be scheduled
  10. func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
  11. identity, err := lifetime.Identity()
  12. if err != nil {
  13. return err
  14. }
  15. done := make(chan bool)
  16. closed := new(int32)
  17. close := func() {
  18. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  19. close(done)
  20. }
  21. }
  22. l := &pluginLifeTime{
  23. lifetime: lifetime,
  24. }
  25. lifetime.OnStop(func() {
  26. c.plugin_lock.Lock()
  27. c.plugins.Delete(identity)
  28. // remove plugin state
  29. c.doPluginStateUpdate(l)
  30. c.plugin_lock.Unlock()
  31. close()
  32. })
  33. c.plugin_lock.Lock()
  34. if !lifetime.Stopped() {
  35. c.plugins.Store(identity, l)
  36. // do plugin state update immediately
  37. err = c.doPluginStateUpdate(l)
  38. if err != nil {
  39. close()
  40. c.plugin_lock.Unlock()
  41. return err
  42. }
  43. } else {
  44. close()
  45. }
  46. c.plugin_lock.Unlock()
  47. log.Info("start to schedule plugin %s", identity)
  48. return nil
  49. }
  50. const (
  51. PLUGIN_STATE_MAP_KEY = "plugin_state"
  52. )
  53. func (c *Cluster) getPluginStateKey(node_id string, plugin_id string) string {
  54. return node_id + ":" + plugin_id
  55. }
  56. func (c *Cluster) getScanPluginsByNodeKey(node_id string) string {
  57. return node_id + ":*"
  58. }
  59. func (c *Cluster) getScanPluginsByIdKey(plugin_id string) string {
  60. return "*:" + plugin_id
  61. }
  62. func (c *Cluster) FetchPluginAvailableNodes(hashed_plugin_id string) ([]string, error) {
  63. states, err := cache.ScanMap[entities.PluginRuntimeState](PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashed_plugin_id))
  64. if err != nil {
  65. return nil, err
  66. }
  67. nodes := make([]string, 0)
  68. for key := range states {
  69. // split key into node_id and plugin_id
  70. if len(key) < len(hashed_plugin_id)+1 {
  71. log.Error("unexpected plugin state key: %s", key)
  72. continue
  73. }
  74. node_id := key[:len(key)-len(hashed_plugin_id)-1]
  75. nodes = append(nodes, node_id)
  76. }
  77. return nodes, nil
  78. }
  79. // SchedulePlugin schedules a plugin to the cluster
  80. // it will walk through the plugin state map and update all the states
  81. // as for the plugin has exited, normally, it will be removed automatically
  82. // but once a plugin is not removed, it will be gc by the master node
  83. func (c *Cluster) schedulePlugins() error {
  84. c.plugins.Range(func(key string, value *pluginLifeTime) bool {
  85. // do plugin state update
  86. err := c.doPluginStateUpdate(value)
  87. if err != nil {
  88. log.Error("failed to update plugin state: %s", err.Error())
  89. }
  90. return true
  91. })
  92. return nil
  93. }
  94. // doPluginUpdate updates the plugin state and schedule the plugin
  95. func (c *Cluster) doPluginStateUpdate(lifetime *pluginLifeTime) error {
  96. state := lifetime.lifetime.RuntimeState()
  97. hash_identity, err := lifetime.lifetime.HashedIdentity()
  98. if err != nil {
  99. return err
  100. }
  101. identity, err := lifetime.lifetime.Identity()
  102. if err != nil {
  103. return err
  104. }
  105. state_key := c.getPluginStateKey(c.id, hash_identity)
  106. // check if the plugin has been removed
  107. if !c.plugins.Exits(identity) {
  108. // remove state
  109. err = c.removePluginState(hash_identity)
  110. if err != nil {
  111. return err
  112. }
  113. } else {
  114. // update plugin state
  115. state.ScheduledAt = &[]time.Time{time.Now()}[0]
  116. lifetime.lifetime.UpdateState(state)
  117. err = cache.SetMapOneField(PLUGIN_STATE_MAP_KEY, state_key, state)
  118. if err != nil {
  119. return err
  120. }
  121. }
  122. return nil
  123. }
  124. func (c *Cluster) removePluginState(hashed_identity string) error {
  125. return cache.DelMapField(PLUGIN_STATE_MAP_KEY, c.getPluginStateKey(c.id, hashed_identity))
  126. }