lifetime_manager.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package plugin_manager
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  12. )
  13. const (
  14. KEY_PLUGIN_LIFETIME_STATE = "lifetime_state"
  15. KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK = "lifetime_state_modify_lock"
  16. )
  17. type PluginLifeTime struct {
  18. Identity string `json:"identity"`
  19. Restarts int `json:"restarts"`
  20. Status string `json:"status"`
  21. Config plugin_entities.PluginDeclaration `json:"configuration"`
  22. }
  23. type pluginLifeCollection struct {
  24. Collection map[string]PluginLifeTime `json:"state"`
  25. ID string `json:"id"`
  26. LastCheckAt time.Time `json:"last_check_at"`
  27. }
  28. func (p pluginLifeCollection) MarshalBinary() ([]byte, error) {
  29. return parser.MarshalJsonBytes(p), nil
  30. }
  31. var (
  32. instanceId = uuid.New().String()
  33. pluginLifetimeStateLock = sync.RWMutex{}
  34. pluginLifetimeCollection = pluginLifeCollection{
  35. Collection: map[string]PluginLifeTime{},
  36. ID: instanceId,
  37. }
  38. )
  39. func startLifeTimeManager(config *app.Config) {
  40. go func() {
  41. // do check immediately
  42. doClusterLifetimeCheck(config.LifetimeCollectionGCInterval)
  43. duration := time.Duration(config.LifetimeCollectionHeartbeatInterval) * time.Second
  44. for range time.NewTicker(duration).C {
  45. doClusterLifetimeCheck(config.LifetimeCollectionGCInterval)
  46. }
  47. }()
  48. }
  49. func doClusterLifetimeCheck(heartbeat_interval int) {
  50. // check and update self lifetime state
  51. if err := updateCurrentInstanceLifetimeCollection(); err != nil {
  52. log.Error("update current instance lifetime state failed: %s", err.Error())
  53. return
  54. }
  55. // lock cluster and do cluster lifetime check
  56. if cache.Lock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK, time.Second*10, time.Second*10) != nil {
  57. log.Error("update lifetime state failed: lock failed")
  58. return
  59. }
  60. defer cache.Unlock(KEY_PLUGIN_LIFETIME_STATE_MODIFY_LOCK)
  61. cluster_lifetime_collections, err := fetchClusterPluginLifetimeCollections()
  62. if err != nil {
  63. log.Error("fetch cluster plugin lifetime state failed: %s", err.Error())
  64. return
  65. }
  66. for cluster_id, state := range cluster_lifetime_collections {
  67. if state.ID == instanceId {
  68. continue
  69. }
  70. // skip if last check has been done in $LIFETIME_COLLECTION_CG_INTERVAL
  71. cg_interval := time.Duration(heartbeat_interval) * time.Second
  72. if time.Since(state.LastCheckAt) < cg_interval {
  73. continue
  74. }
  75. // if last check has not been done in $LIFETIME_COLLECTION_CG_INTERVAL * 2, delete it
  76. if time.Since(state.LastCheckAt) > cg_interval*2 {
  77. if err := cache.DelMapField(KEY_PLUGIN_LIFETIME_STATE, cluster_id); err != nil {
  78. log.Error("delete cluster plugin lifetime state failed: %s", err.Error())
  79. } else {
  80. log.Info("delete cluster plugin lifetime state due to no longer active: %s", cluster_id)
  81. }
  82. }
  83. }
  84. }
  85. func newLifetimeFromRuntimeState(state entities.PluginRuntimeInterface) PluginLifeTime {
  86. s := state.RuntimeState()
  87. c := state.Configuration()
  88. return PluginLifeTime{
  89. Identity: c.Identity(),
  90. Restarts: s.Restarts,
  91. Status: s.Status,
  92. Config: *c,
  93. }
  94. }
  95. func addLifetimeState(state entities.PluginRuntimeInterface) {
  96. pluginLifetimeStateLock.Lock()
  97. defer pluginLifetimeStateLock.Unlock()
  98. pluginLifetimeCollection.Collection[state.Configuration().Identity()] = newLifetimeFromRuntimeState(state)
  99. }
  100. func deleteLifetimeState(state entities.PluginRuntimeInterface) {
  101. pluginLifetimeStateLock.Lock()
  102. defer pluginLifetimeStateLock.Unlock()
  103. delete(pluginLifetimeCollection.Collection, state.Configuration().Identity())
  104. }
  105. func updateCurrentInstanceLifetimeCollection() error {
  106. pluginLifetimeStateLock.Lock()
  107. defer pluginLifetimeStateLock.Unlock()
  108. pluginLifetimeCollection.LastCheckAt = time.Now()
  109. m.Range(func(key, value interface{}) bool {
  110. if v, ok := value.(entities.PluginRuntimeInterface); ok {
  111. pluginLifetimeCollection.Collection[v.Configuration().Identity()] = newLifetimeFromRuntimeState(v)
  112. }
  113. return true
  114. })
  115. return cache.SetMapOneField(KEY_PLUGIN_LIFETIME_STATE, instanceId, pluginLifetimeCollection)
  116. }
  117. func fetchClusterPluginLifetimeCollections() (map[string]pluginLifeCollection, error) {
  118. return cache.GetMap[pluginLifeCollection](KEY_PLUGIN_LIFETIME_STATE)
  119. }