123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- // Package local_runtime handles the local plugin runtime management
- package local_runtime
- import (
- "errors"
- "fmt"
- "os"
- "os/exec"
- "sync"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities/constants"
- "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
- )
- // gc performs garbage collection for the LocalPluginRuntime
- func (r *LocalPluginRuntime) gc() {
- if r.ioIdentity != "" {
- removeStdioHandler(r.ioIdentity)
- }
- if r.waitChan != nil {
- close(r.waitChan)
- r.waitChan = nil
- }
- }
- // Type returns the runtime type of the plugin
- func (r *LocalPluginRuntime) Type() plugin_entities.PluginRuntimeType {
- return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
- }
- // getCmd prepares the exec.Cmd for the plugin based on its language
- func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
- if r.Config.Meta.Runner.Language == constants.Python {
- cmd := exec.Command(r.pythonInterpreterPath, "-m", r.Config.Meta.Runner.Entrypoint)
- cmd.Dir = r.State.WorkingPath
- if r.proxyHttps != "" {
- cmd.Env = append(cmd.Env, fmt.Sprintf("HTTPS_PROXY=%s", r.proxyHttps))
- }
- if r.proxyHttp != "" {
- cmd.Env = append(cmd.Env, fmt.Sprintf("HTTP_PROXY=%s", r.proxyHttp))
- }
- return cmd, nil
- }
- return nil, fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
- }
- // StartPlugin starts the plugin and manages its lifecycle
- func (r *LocalPluginRuntime) StartPlugin() error {
- defer log.Info("plugin %s stopped", r.Config.Identity())
- defer func() {
- r.waitChanLock.Lock()
- for _, c := range r.waitStoppedChan {
- select {
- case c <- true:
- default:
- }
- }
- r.waitChanLock.Unlock()
- }()
- if r.isNotFirstStart {
- r.SetRestarting()
- } else {
- r.SetLaunching()
- r.isNotFirstStart = true
- }
- // reset wait chan
- r.waitChan = make(chan bool)
- // reset wait launched chan
- // start plugin
- e, err := r.getCmd()
- if err != nil {
- return err
- }
- e.Dir = r.State.WorkingPath
- // add env INSTALL_METHOD=local
- e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH"))
- // get writer
- stdin, err := e.StdinPipe()
- if err != nil {
- return fmt.Errorf("get stdin pipe failed: %s", err.Error())
- }
- defer stdin.Close()
- // get stdout
- stdout, err := e.StdoutPipe()
- if err != nil {
- return fmt.Errorf("get stdout pipe failed: %s", err.Error())
- }
- defer stdout.Close()
- // get stderr
- stderr, err := e.StderrPipe()
- if err != nil {
- return fmt.Errorf("get stderr pipe failed: %s", err.Error())
- }
- defer stderr.Close()
- if err := e.Start(); err != nil {
- return fmt.Errorf("start plugin failed: %s", err.Error())
- }
- var stdio *stdioHolder
- defer func() {
- // wait for plugin to exit
- originalErr := e.Wait()
- if originalErr != nil {
- // get stdio
- var err error
- if stdio != nil {
- stdioErr := stdio.Error()
- if stdioErr != nil {
- err = errors.Join(originalErr, stdioErr)
- } else {
- err = originalErr
- }
- } else {
- err = originalErr
- }
- if err != nil {
- log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
- } else {
- log.Error("plugin %s exited with unknown error", r.Config.Identity())
- }
- }
- r.gc()
- }()
- // ensure the plugin process is killed after the plugin exits
- defer e.Process.Kill()
- log.Info("plugin %s started", r.Config.Identity())
- // setup stdio
- stdio = registerStdioHandler(r.Config.Identity(), stdin, stdout, stderr)
- r.ioIdentity = stdio.GetID()
- defer stdio.Stop()
- wg := sync.WaitGroup{}
- wg.Add(2)
- // listen to plugin stdout
- routine.Submit(map[string]string{
- "module": "plugin_manager",
- "type": "local",
- "function": "StartStdout",
- }, func() {
- defer wg.Done()
- stdio.StartStdout(func() {})
- })
- // listen to plugin stderr
- routine.Submit(map[string]string{
- "module": "plugin_manager",
- "type": "local",
- "function": "StartStderr",
- }, func() {
- defer wg.Done()
- stdio.StartStderr()
- })
- // send started event
- r.waitChanLock.Lock()
- for _, c := range r.waitStartedChan {
- select {
- case c <- true:
- default:
- }
- }
- r.waitChanLock.Unlock()
- // wait for plugin to exit
- err = stdio.Wait()
- if err != nil {
- return errors.Join(err, stdio.Error())
- }
- wg.Wait()
- // plugin has exited
- return nil
- }
- // Wait returns a channel that will be closed when the plugin stops
- func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
- if r.waitChan == nil {
- return nil, errors.New("plugin not started")
- }
- return r.waitChan, nil
- }
- // WaitStarted returns a channel that will receive true when the plugin starts
- func (r *LocalPluginRuntime) WaitStarted() <-chan bool {
- c := make(chan bool)
- r.waitChanLock.Lock()
- r.waitStartedChan = append(r.waitStartedChan, c)
- r.waitChanLock.Unlock()
- return c
- }
- // WaitStopped returns a channel that will receive true when the plugin stops
- func (r *LocalPluginRuntime) WaitStopped() <-chan bool {
- c := make(chan bool)
- r.waitChanLock.Lock()
- r.waitStoppedChan = append(r.waitStoppedChan, c)
- r.waitChanLock.Unlock()
- return c
- }
- // Stop stops the plugin
- func (r *LocalPluginRuntime) Stop() {
- // inherit from PluginRuntime
- r.PluginRuntime.Stop()
- // get stdio
- stdio := getStdioHandler(r.ioIdentity)
- if stdio != nil {
- stdio.Stop()
- }
- }
|