run.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. // Package local_runtime handles the local plugin runtime management
  2. package local_runtime
  3. import (
  4. "errors"
  5. "fmt"
  6. "os"
  7. "os/exec"
  8. "sync"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  11. "github.com/langgenius/dify-plugin-daemon/pkg/entities/constants"
  12. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  13. )
  14. // gc performs garbage collection for the LocalPluginRuntime
  15. func (r *LocalPluginRuntime) gc() {
  16. if r.ioIdentity != "" {
  17. removeStdioHandler(r.ioIdentity)
  18. }
  19. if r.waitChan != nil {
  20. close(r.waitChan)
  21. r.waitChan = nil
  22. }
  23. }
  24. // Type returns the runtime type of the plugin
  25. func (r *LocalPluginRuntime) Type() plugin_entities.PluginRuntimeType {
  26. return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  27. }
  28. // getCmd prepares the exec.Cmd for the plugin based on its language
  29. func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
  30. if r.Config.Meta.Runner.Language == constants.Python {
  31. cmd := exec.Command(r.pythonInterpreterPath, "-m", r.Config.Meta.Runner.Entrypoint)
  32. cmd.Dir = r.State.WorkingPath
  33. return cmd, nil
  34. }
  35. return nil, fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
  36. }
  37. // StartPlugin starts the plugin and manages its lifecycle
  38. func (r *LocalPluginRuntime) StartPlugin() error {
  39. defer log.Info("plugin %s stopped", r.Config.Identity())
  40. defer func() {
  41. r.waitChanLock.Lock()
  42. for _, c := range r.waitStoppedChan {
  43. select {
  44. case c <- true:
  45. default:
  46. }
  47. }
  48. r.waitChanLock.Unlock()
  49. }()
  50. if r.isNotFirstStart {
  51. r.SetRestarting()
  52. } else {
  53. r.SetLaunching()
  54. r.isNotFirstStart = true
  55. }
  56. // reset wait chan
  57. r.waitChan = make(chan bool)
  58. // reset wait launched chan
  59. // start plugin
  60. e, err := r.getCmd()
  61. if err != nil {
  62. return err
  63. }
  64. e.Dir = r.State.WorkingPath
  65. // add env INSTALL_METHOD=local
  66. e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH"))
  67. // get writer
  68. stdin, err := e.StdinPipe()
  69. if err != nil {
  70. return fmt.Errorf("get stdin pipe failed: %s", err.Error())
  71. }
  72. defer stdin.Close()
  73. // get stdout
  74. stdout, err := e.StdoutPipe()
  75. if err != nil {
  76. return fmt.Errorf("get stdout pipe failed: %s", err.Error())
  77. }
  78. defer stdout.Close()
  79. // get stderr
  80. stderr, err := e.StderrPipe()
  81. if err != nil {
  82. return fmt.Errorf("get stderr pipe failed: %s", err.Error())
  83. }
  84. defer stderr.Close()
  85. if err := e.Start(); err != nil {
  86. return fmt.Errorf("start plugin failed: %s", err.Error())
  87. }
  88. var stdio *stdioHolder
  89. defer func() {
  90. // wait for plugin to exit
  91. err = e.Wait()
  92. if err != nil {
  93. // get stdio
  94. var err error
  95. if stdio != nil {
  96. err = stdio.Error()
  97. }
  98. err = errors.Join(err, err)
  99. log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
  100. }
  101. r.gc()
  102. }()
  103. // ensure the plugin process is killed after the plugin exits
  104. defer e.Process.Kill()
  105. log.Info("plugin %s started", r.Config.Identity())
  106. // setup stdio
  107. stdio = registerStdioHandler(r.Config.Identity(), stdin, stdout, stderr)
  108. r.ioIdentity = stdio.GetID()
  109. defer stdio.Stop()
  110. wg := sync.WaitGroup{}
  111. wg.Add(2)
  112. // listen to plugin stdout
  113. routine.Submit(map[string]string{
  114. "module": "plugin_manager",
  115. "type": "local",
  116. "function": "StartStdout",
  117. }, func() {
  118. defer wg.Done()
  119. stdio.StartStdout(func() {})
  120. })
  121. // listen to plugin stderr
  122. routine.Submit(map[string]string{
  123. "module": "plugin_manager",
  124. "type": "local",
  125. "function": "StartStderr",
  126. }, func() {
  127. defer wg.Done()
  128. stdio.StartStderr()
  129. })
  130. // send started event
  131. r.waitChanLock.Lock()
  132. for _, c := range r.waitStartedChan {
  133. select {
  134. case c <- true:
  135. default:
  136. }
  137. }
  138. r.waitChanLock.Unlock()
  139. // wait for plugin to exit
  140. err = stdio.Wait()
  141. if err != nil {
  142. return errors.Join(err, stdio.Error())
  143. }
  144. wg.Wait()
  145. // plugin has exited
  146. return nil
  147. }
  148. // Wait returns a channel that will be closed when the plugin stops
  149. func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
  150. if r.waitChan == nil {
  151. return nil, errors.New("plugin not started")
  152. }
  153. return r.waitChan, nil
  154. }
  155. // WaitStarted returns a channel that will receive true when the plugin starts
  156. func (r *LocalPluginRuntime) WaitStarted() <-chan bool {
  157. c := make(chan bool)
  158. r.waitChanLock.Lock()
  159. r.waitStartedChan = append(r.waitStartedChan, c)
  160. r.waitChanLock.Unlock()
  161. return c
  162. }
  163. // WaitStopped returns a channel that will receive true when the plugin stops
  164. func (r *LocalPluginRuntime) WaitStopped() <-chan bool {
  165. c := make(chan bool)
  166. r.waitChanLock.Lock()
  167. r.waitStoppedChan = append(r.waitStoppedChan, c)
  168. r.waitChanLock.Unlock()
  169. return c
  170. }
  171. // Stop stops the plugin
  172. func (r *LocalPluginRuntime) Stop() {
  173. // inherit from PluginRuntime
  174. r.PluginRuntime.Stop()
  175. // get stdio
  176. stdio := getStdioHandler(r.ioIdentity)
  177. if stdio != nil {
  178. stdio.Stop()
  179. }
  180. }