| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 | 
							- package runner
 
- import (
 
- 	"fmt"
 
- 	"io"
 
- 	"os/exec"
 
- 	"strings"
 
- 	"sync"
 
- 	"time"
 
- 	"github.com/langgenius/dify-sandbox/internal/utils/log"
 
- )
 
- type OutputCaptureRunner struct {
 
- 	stdout chan []byte
 
- 	stderr chan []byte
 
- 	done   chan bool
 
- 	timeout time.Duration
 
- 	after_exit_hook func()
 
- }
 
- func NewOutputCaptureRunner() *OutputCaptureRunner {
 
- 	return &OutputCaptureRunner{
 
- 		stdout: make(chan []byte, 42),
 
- 		stderr: make(chan []byte, 42),
 
- 		done:   make(chan bool, 1),
 
- 	}
 
- }
 
- func (s *OutputCaptureRunner) WriteError(data []byte) {
 
- 	if s.stderr != nil {
 
- 		s.stderr <- data
 
- 	}
 
- }
 
- func (s *OutputCaptureRunner) WriteOutput(data []byte) {
 
- 	if s.stdout != nil {
 
- 		s.stdout <- data
 
- 	}
 
- }
 
- func (s *OutputCaptureRunner) SetAfterExitHook(hook func()) {
 
- 	s.after_exit_hook = hook
 
- }
 
- func (s *OutputCaptureRunner) SetTimeout(timeout time.Duration) {
 
- 	s.timeout = timeout
 
- }
 
- func (s *OutputCaptureRunner) CaptureOutput(cmd *exec.Cmd) error {
 
- 	// start a timer for the timeout
 
- 	timeout := s.timeout
 
- 	if timeout == 0 {
 
- 		timeout = 5 * time.Second
 
- 	}
 
- 	timer := time.NewTimer(timeout)
 
- 	go func() {
 
- 		<-timer.C
 
- 		if cmd != nil && cmd.Process != nil {
 
- 			// write the error
 
- 			s.WriteError([]byte("error: timeout\n"))
 
- 			// send a signal to the process
 
- 			cmd.Process.Kill()
 
- 		}
 
- 	}()
 
- 	// create a pipe for the stdout
 
- 	stdout_reader, err := cmd.StdoutPipe()
 
- 	if err != nil {
 
- 		return err
 
- 	}
 
- 	// create a pipe for the stderr
 
- 	stderr_reader, err := cmd.StderrPipe()
 
- 	if err != nil {
 
- 		stdout_reader.Close()
 
- 		return err
 
- 	}
 
- 	// start the process
 
- 	err = cmd.Start()
 
- 	if err != nil {
 
- 		stdout_reader.Close()
 
- 		stderr_reader.Close()
 
- 		return err
 
- 	}
 
- 	wg := sync.WaitGroup{}
 
- 	wg.Add(2)
 
- 	// read the output
 
- 	go func() {
 
- 		defer wg.Done()
 
- 		for {
 
- 			buf := make([]byte, 1024)
 
- 			n, err := stdout_reader.Read(buf)
 
- 			// exit if EOF
 
- 			if err != nil {
 
- 				if err == io.EOF {
 
- 					break
 
- 				} else {
 
- 					s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
 
- 					break
 
- 				}
 
- 			}
 
- 			s.WriteOutput(buf[:n])
 
- 		}
 
- 	}()
 
- 	// read the error
 
- 	go func() {
 
- 		buf := make([]byte, 1024)
 
- 		defer wg.Done()
 
- 		for {
 
- 			n, err := stderr_reader.Read(buf)
 
- 			// exit if EOF
 
- 			if err != nil {
 
- 				if err == io.EOF {
 
- 					break
 
- 				} else {
 
- 					s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
 
- 					break
 
- 				}
 
- 			}
 
- 			s.WriteError(buf[:n])
 
- 		}
 
- 	}()
 
- 	// wait for the process to finish
 
- 	go func() {
 
- 		status, err := cmd.Process.Wait()
 
- 		if err != nil {
 
- 			log.Error("process finished with status: %v", status.String())
 
- 			s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
 
- 		} else if status.ExitCode() != 0 {
 
- 			exit_string := status.String()
 
- 			if strings.Contains(exit_string, "bad system call") {
 
- 				s.WriteError([]byte("error: operation not permitted\n"))
 
- 			} else {
 
- 				s.WriteError([]byte(fmt.Sprintf("error: %v\n", exit_string)))
 
- 			}
 
- 		}
 
- 		// wait for the stdout and stderr to finish
 
- 		wg.Wait()
 
- 		stderr_reader.Close()
 
- 		stdout_reader.Close()
 
- 		if s.after_exit_hook != nil {
 
- 			s.after_exit_hook()
 
- 		}
 
- 		// stop the timer
 
- 		timer.Stop()
 
- 		s.done <- true
 
- 	}()
 
- 	return nil
 
- }
 
- func (s *OutputCaptureRunner) GetStdout() chan []byte {
 
- 	return s.stdout
 
- }
 
- func (s *OutputCaptureRunner) GetStderr() chan []byte {
 
- 	return s.stderr
 
- }
 
- func (s *OutputCaptureRunner) GetDone() chan bool {
 
- 	return s.done
 
- }
 
 
  |