runtime_lifetime.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package plugin_manager
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  7. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  8. )
  9. func (p *PluginManager) AddPluginRegisterHandler(handler func(r plugin_entities.PluginLifetime) error) {
  10. p.pluginRegisters = append(p.pluginRegisters, handler)
  11. }
  12. func (p *PluginManager) fullDuplexLifecycle(
  13. r plugin_entities.PluginFullDuplexLifetime,
  14. launchedChan chan bool,
  15. errChan chan error,
  16. ) {
  17. // stop plugin when the plugin reaches the end of its lifetime
  18. defer r.Stop()
  19. // cleanup plugin runtime state and working directory
  20. defer r.Cleanup()
  21. // remove lifetime state after plugin if it has been stopped
  22. defer r.TriggerStop()
  23. // register plugin
  24. for _, reg := range p.pluginRegisters {
  25. err := reg(r)
  26. if err != nil {
  27. log.Error("add plugin to cluster failed: %s", err.Error())
  28. return
  29. }
  30. }
  31. configuration := r.Configuration()
  32. log.Info("new plugin logged in: %s", configuration.Identity())
  33. defer func() {
  34. log.Info("plugin %s has exited", configuration.Identity())
  35. }()
  36. // try to init environment until succeed
  37. failedTimes := 0
  38. // only notify launched once
  39. once := sync.Once{}
  40. for !r.Stopped() {
  41. // notify launched if failed too many times
  42. if failedTimes > 3 {
  43. once.Do(func() {
  44. if errChan != nil {
  45. errChan <- fmt.Errorf(
  46. "init environment for plugin %s failed too many times, "+
  47. "you should consider the package is corrupted or your network is unstable",
  48. configuration.Identity(),
  49. )
  50. close(errChan)
  51. }
  52. if launchedChan != nil {
  53. close(launchedChan)
  54. }
  55. })
  56. }
  57. log.Info("init environment for plugin %s", configuration.Identity())
  58. if err := r.InitEnvironment(); err != nil {
  59. if r.Stopped() {
  60. // plugin has been stopped, exit
  61. break
  62. }
  63. log.Error("init environment failed: %s, retry in 30s", err.Error())
  64. time.Sleep(30 * time.Second)
  65. failedTimes++
  66. continue
  67. }
  68. break
  69. }
  70. // notify launched
  71. once.Do(func() {
  72. if launchedChan != nil {
  73. close(launchedChan)
  74. }
  75. if errChan != nil {
  76. close(errChan)
  77. }
  78. })
  79. // init environment successfully
  80. // once succeed, we consider the plugin is installed successfully
  81. for !r.Stopped() {
  82. // start plugin
  83. if err := r.StartPlugin(); err != nil {
  84. if r.Stopped() {
  85. // plugin has been stopped, exit
  86. break
  87. }
  88. }
  89. // wait for plugin to stop normally
  90. c, err := r.Wait()
  91. if err == nil {
  92. <-c
  93. }
  94. // restart plugin in 5s
  95. time.Sleep(5 * time.Second)
  96. // add restart times
  97. r.AddRestarts()
  98. }
  99. }