lifetime_manager.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package plugin_manager
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. )
  12. const (
  13. KEY_PLUGIN_LIFETIME_STATE = "lifetime_state"
  14. KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK = "lifetime_state_modify_lock"
  15. )
  16. type PluginLifeTime struct {
  17. Identity string `json:"identity"`
  18. Restarts int `json:"restarts"`
  19. Status string `json:"status"`
  20. Config entities.PluginConfiguration `json:"configuration"`
  21. }
  22. type pluginLifeCollection struct {
  23. Collection map[string]PluginLifeTime `json:"state"`
  24. ID string `json:"id"`
  25. LastCheckAt time.Time `json:"last_check_at"`
  26. }
  27. func (p pluginLifeCollection) MarshalBinary() ([]byte, error) {
  28. return parser.MarshalJsonBytes(p), nil
  29. }
  30. var (
  31. instanceId = uuid.New().String()
  32. pluginLifetimeStateLock = sync.RWMutex{}
  33. pluginLifetimeCollection = pluginLifeCollection{
  34. Collection: map[string]PluginLifeTime{},
  35. ID: instanceId,
  36. }
  37. )
  38. func startLifeTimeManager(config *app.Config) {
  39. go func() {
  40. // do check immediately
  41. doClusterLifetimeCheck(config.LifetimeCollectionGCInterval)
  42. duration := time.Duration(config.LifetimeCollectionHeartbeatInterval) * time.Second
  43. for range time.NewTicker(duration).C {
  44. doClusterLifetimeCheck(config.LifetimeCollectionGCInterval)
  45. }
  46. }()
  47. }
  48. func doClusterLifetimeCheck(heartbeat_interval int) {
  49. // check and update self lifetime state
  50. if err := updateCurrentInstanceLifetimeCollection(); err != nil {
  51. log.Error("update current instance lifetime state failed: %s", err.Error())
  52. return
  53. }
  54. // lock cluster and do cluster lifetime check
  55. if cache.Lock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK, time.Second*10, time.Second*10) != nil {
  56. log.Error("update lifetime state failed: lock failed")
  57. return
  58. }
  59. defer cache.Unlock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK)
  60. cluster_lifetime_collections, err := fetchClusterPluginLifetimeCollections()
  61. if err != nil {
  62. log.Error("fetch cluster plugin lifetime state failed: %s", err.Error())
  63. return
  64. }
  65. for cluster_id, state := range cluster_lifetime_collections {
  66. if state.ID == instanceId {
  67. continue
  68. }
  69. // skip if last check has been done in $LIFETIME_COLLECTION_CG_INTERVAL
  70. cg_interval := time.Duration(heartbeat_interval) * time.Second
  71. if time.Since(state.LastCheckAt) < cg_interval {
  72. continue
  73. }
  74. // if last check has not been done in $LIFETIME_COLLECTION_CG_INTERVAL * 2, delete it
  75. if time.Since(state.LastCheckAt) > cg_interval*2 {
  76. if err := cache.DelMapField(KEY_PLUGIN_LIFETIME_STATE, cluster_id); err != nil {
  77. log.Error("delete cluster plugin lifetime state failed: %s", err.Error())
  78. } else {
  79. log.Info("delete cluster plugin lifetime state due to no longer active: %s", cluster_id)
  80. }
  81. }
  82. }
  83. }
  84. func newLifetimeFromRuntimeState(state entities.PluginRuntimeInterface) PluginLifeTime {
  85. s := state.RuntimeState()
  86. c := state.Configuration()
  87. return PluginLifeTime{
  88. Identity: c.Identity(),
  89. Restarts: s.Restarts,
  90. Status: s.Status,
  91. Config: *c,
  92. }
  93. }
  94. func addLifetimeState(state entities.PluginRuntimeInterface) {
  95. pluginLifetimeStateLock.Lock()
  96. defer pluginLifetimeStateLock.Unlock()
  97. pluginLifetimeCollection.Collection[state.Configuration().Identity()] = newLifetimeFromRuntimeState(state)
  98. }
  99. func deleteLifetimeState(state entities.PluginRuntimeInterface) {
  100. pluginLifetimeStateLock.Lock()
  101. defer pluginLifetimeStateLock.Unlock()
  102. delete(pluginLifetimeCollection.Collection, state.Configuration().Identity())
  103. }
  104. func updateCurrentInstanceLifetimeCollection() error {
  105. pluginLifetimeStateLock.Lock()
  106. defer pluginLifetimeStateLock.Unlock()
  107. pluginLifetimeCollection.LastCheckAt = time.Now()
  108. m.Range(func(key, value interface{}) bool {
  109. if v, ok := value.(entities.PluginRuntimeInterface); ok {
  110. pluginLifetimeCollection.Collection[v.Configuration().Identity()] = newLifetimeFromRuntimeState(v)
  111. }
  112. return true
  113. })
  114. return cache.SetMapOneField(KEY_PLUGIN_LIFETIME_STATE, instanceId, pluginLifetimeCollection)
  115. }
  116. func fetchClusterPluginLifetimeCollections() (map[string]pluginLifeCollection, error) {
  117. return cache.GetMap[pluginLifeCollection](KEY_PLUGIN_LIFETIME_STATE)
  118. }