浏览代码

fix: endless restarting of local runtime

Yeuoly 8 月之前
父节点
当前提交
b6899ca462
共有 2 个文件被更改,包括 28 次插入24 次删除
  1. 23 21
      internal/core/plugin_manager/local_manager/run.go
  2. 5 3
      internal/core/plugin_manager/local_manager/type.go

+ 23 - 21
internal/core/plugin_manager/local_manager/run.go

@@ -20,9 +20,9 @@ func (r *LocalPluginRuntime) gc() {
 		removeStdioHandler(r.io_identity)
 	}
 
-	if r.wait_chan != nil {
-		close(r.wait_chan)
-		r.wait_chan = nil
+	if r.waitChan != nil {
+		close(r.waitChan)
+		r.waitChan = nil
 	}
 }
 
@@ -46,18 +46,25 @@ func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
 func (r *LocalPluginRuntime) StartPlugin() error {
 	defer log.Info("plugin %s stopped", r.Config.Identity())
 	defer func() {
-		r.wait_chan_lock.Lock()
-		for _, c := range r.wait_stopped_chan {
+		r.waitChanLock.Lock()
+		for _, c := range r.waitStoppedChan {
 			select {
 			case c <- true:
 			default:
 			}
 		}
-		r.wait_chan_lock.Unlock()
+		r.waitChanLock.Unlock()
 	}()
 
+	if r.isNotFirstStart {
+		r.SetRestarting()
+	} else {
+		r.SetLaunching()
+		r.isNotFirstStart = true
+	}
+
 	// reset wait chan
-	r.wait_chan = make(chan bool)
+	r.waitChan = make(chan bool)
 	// reset wait launched chan
 
 	// start plugin
@@ -73,7 +80,6 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// get writer
 	stdin, err := e.StdinPipe()
 	if err != nil {
-		r.SetRestarting()
 		return fmt.Errorf("get stdin pipe failed: %s", err.Error())
 	}
 	defer stdin.Close()
@@ -81,7 +87,6 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// get stdout
 	stdout, err := e.StdoutPipe()
 	if err != nil {
-		r.SetRestarting()
 		return fmt.Errorf("get stdout pipe failed: %s", err.Error())
 	}
 	defer stdout.Close()
@@ -89,13 +94,11 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// get stderr
 	stderr, err := e.StderrPipe()
 	if err != nil {
-		r.SetRestarting()
 		return fmt.Errorf("get stderr pipe failed: %s", err.Error())
 	}
 	defer stderr.Close()
 
 	if err := e.Start(); err != nil {
-		r.SetRestarting()
 		return fmt.Errorf("start plugin failed: %s", err.Error())
 	}
 
@@ -103,7 +106,6 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 		// wait for plugin to exit
 		err = e.Wait()
 		if err != nil {
-			r.SetRestarting()
 			log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
 		}
 
@@ -136,14 +138,14 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	})
 
 	// send started event
-	r.wait_chan_lock.Lock()
+	r.waitChanLock.Lock()
 	for _, c := range r.wait_started_chan {
 		select {
 		case c <- true:
 		default:
 		}
 	}
-	r.wait_chan_lock.Unlock()
+	r.waitChanLock.Unlock()
 
 	// wait for plugin to exit
 	err = stdio.Wait()
@@ -159,27 +161,27 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 
 // Wait returns a channel that will be closed when the plugin stops
 func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
-	if r.wait_chan == nil {
+	if r.waitChan == nil {
 		return nil, errors.New("plugin not started")
 	}
-	return r.wait_chan, nil
+	return r.waitChan, nil
 }
 
 // WaitStarted returns a channel that will receive true when the plugin starts
 func (r *LocalPluginRuntime) WaitStarted() <-chan bool {
 	c := make(chan bool)
-	r.wait_chan_lock.Lock()
+	r.waitChanLock.Lock()
 	r.wait_started_chan = append(r.wait_started_chan, c)
-	r.wait_chan_lock.Unlock()
+	r.waitChanLock.Unlock()
 	return c
 }
 
 // WaitStopped returns a channel that will receive true when the plugin stops
 func (r *LocalPluginRuntime) WaitStopped() <-chan bool {
 	c := make(chan bool)
-	r.wait_chan_lock.Lock()
-	r.wait_stopped_chan = append(r.wait_stopped_chan, c)
-	r.wait_chan_lock.Unlock()
+	r.waitChanLock.Lock()
+	r.waitStoppedChan = append(r.waitStoppedChan, c)
+	r.waitChanLock.Unlock()
 	return c
 }
 

+ 5 - 3
internal/core/plugin_manager/local_manager/type.go

@@ -11,7 +11,7 @@ type LocalPluginRuntime struct {
 	positive_manager.PositivePluginRuntime
 	plugin_entities.PluginRuntime
 
-	wait_chan   chan bool
+	waitChan    chan bool
 	io_identity string
 
 	// python interpreter path, currently only support python
@@ -21,9 +21,11 @@ type LocalPluginRuntime struct {
 	// by using its venv module
 	default_python_interpreter_path string
 
-	wait_chan_lock    sync.Mutex
+	waitChanLock      sync.Mutex
 	wait_started_chan []chan bool
-	wait_stopped_chan []chan bool
+	waitStoppedChan   []chan bool
+
+	isNotFirstStart bool
 }
 
 func NewLocalPluginRuntime(