lifetime.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. )
  9. func (p *PluginManager) AddPluginRegisterHandler(handler func(r plugin_entities.PluginLifetime) error) {
  10. p.pluginRegisters = append(p.pluginRegisters, handler)
  11. }
  12. func (p *PluginManager) fullDuplexLifecycle(
  13. r plugin_entities.PluginFullDuplexLifetime,
  14. launchedChan chan bool,
  15. errChan chan error,
  16. ) {
  17. configuration := r.Configuration()
  18. log.Info("new plugin logged in: %s", configuration.Identity())
  19. defer log.Info("plugin %s has exited", configuration.Identity())
  20. // cleanup plugin runtime state and working directory
  21. defer r.Cleanup()
  22. // stop plugin when the plugin reaches the end of its lifetime
  23. defer r.Stop()
  24. // register plugin
  25. for _, reg := range p.pluginRegisters {
  26. err := reg(r)
  27. if err != nil {
  28. log.Error("add plugin to cluster failed: %s", err.Error())
  29. return
  30. }
  31. }
  32. // remove lifetime state after plugin if it has been stopped
  33. defer r.TriggerStop()
  34. // try to init environment until succeed
  35. failedTimes := 0
  36. // only notify launched once
  37. once := sync.Once{}
  38. for !r.Stopped() {
  39. // notify launched if failed too many times
  40. if failedTimes > 3 {
  41. once.Do(func() {
  42. if errChan != nil {
  43. errChan <- fmt.Errorf(
  44. "init environment for plugin %s failed too many times, "+
  45. "you should consider the package is corrupted or your network is unstable",
  46. configuration.Identity(),
  47. )
  48. close(errChan)
  49. }
  50. if launchedChan != nil {
  51. close(launchedChan)
  52. }
  53. })
  54. }
  55. log.Info("init environment for plugin %s", configuration.Identity())
  56. if err := r.InitEnvironment(); err != nil {
  57. if r.Stopped() {
  58. // plugin has been stopped, exit
  59. break
  60. }
  61. log.Error("init environment failed: %s, retry in 30s", err.Error())
  62. time.Sleep(30 * time.Second)
  63. failedTimes++
  64. continue
  65. }
  66. break
  67. }
  68. // notify launched
  69. once.Do(func() {
  70. if launchedChan != nil {
  71. close(launchedChan)
  72. }
  73. if errChan != nil {
  74. close(errChan)
  75. }
  76. })
  77. // init environment successfully
  78. // once succeed, we consider the plugin is installed successfully
  79. for !r.Stopped() {
  80. // start plugin
  81. if err := r.StartPlugin(); err != nil {
  82. if r.Stopped() {
  83. // plugin has been stopped, exit
  84. break
  85. }
  86. }
  87. // wait for plugin to stop normally
  88. c, err := r.Wait()
  89. if err == nil {
  90. <-c
  91. }
  92. // restart plugin in 5s
  93. time.Sleep(5 * time.Second)
  94. // add restart times
  95. r.AddRestarts()
  96. }
  97. }