output_capture.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package runner
  2. import (
  3. "fmt"
  4. "io"
  5. "os/exec"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/langgenius/dify-sandbox/internal/utils/log"
  10. )
  11. type OutputCaptureRunner struct {
  12. stdout chan []byte
  13. stderr chan []byte
  14. done chan bool
  15. timeout time.Duration
  16. after_exit_hook func()
  17. }
  18. func NewOutputCaptureRunner() *OutputCaptureRunner {
  19. return &OutputCaptureRunner{
  20. stdout: make(chan []byte, 42),
  21. stderr: make(chan []byte, 42),
  22. done: make(chan bool, 1),
  23. }
  24. }
  25. func (s *OutputCaptureRunner) WriteError(data []byte) {
  26. if s.stderr != nil {
  27. s.stderr <- data
  28. }
  29. }
  30. func (s *OutputCaptureRunner) WriteOutput(data []byte) {
  31. if s.stdout != nil {
  32. s.stdout <- data
  33. }
  34. }
  35. func (s *OutputCaptureRunner) SetAfterExitHook(hook func()) {
  36. s.after_exit_hook = hook
  37. }
  38. func (s *OutputCaptureRunner) SetTimeout(timeout time.Duration) {
  39. s.timeout = timeout
  40. }
  41. func (s *OutputCaptureRunner) CaptureOutput(cmd *exec.Cmd) error {
  42. // start a timer for the timeout
  43. timeout := s.timeout
  44. if timeout == 0 {
  45. timeout = 5 * time.Second
  46. }
  47. timer := time.NewTimer(timeout)
  48. go func() {
  49. <-timer.C
  50. if cmd != nil && cmd.Process != nil {
  51. // write the error
  52. s.WriteError([]byte("error: timeout\n"))
  53. // send a signal to the process
  54. cmd.Process.Kill()
  55. }
  56. }()
  57. // create a pipe for the stdout
  58. stdout_reader, err := cmd.StdoutPipe()
  59. if err != nil {
  60. return err
  61. }
  62. // create a pipe for the stderr
  63. stderr_reader, err := cmd.StderrPipe()
  64. if err != nil {
  65. stdout_reader.Close()
  66. return err
  67. }
  68. // start the process
  69. err = cmd.Start()
  70. if err != nil {
  71. stdout_reader.Close()
  72. stderr_reader.Close()
  73. return err
  74. }
  75. wg := sync.WaitGroup{}
  76. wg.Add(2)
  77. // read the output
  78. go func() {
  79. defer wg.Done()
  80. for {
  81. buf := make([]byte, 1024)
  82. n, err := stdout_reader.Read(buf)
  83. // exit if EOF
  84. if err != nil {
  85. if err == io.EOF {
  86. break
  87. } else {
  88. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  89. break
  90. }
  91. }
  92. s.WriteOutput(buf[:n])
  93. }
  94. }()
  95. // read the error
  96. go func() {
  97. buf := make([]byte, 1024)
  98. defer wg.Done()
  99. for {
  100. n, err := stderr_reader.Read(buf)
  101. // exit if EOF
  102. if err != nil {
  103. if err == io.EOF {
  104. break
  105. } else {
  106. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  107. break
  108. }
  109. }
  110. s.WriteError(buf[:n])
  111. }
  112. }()
  113. // wait for the process to finish
  114. go func() {
  115. status, err := cmd.Process.Wait()
  116. if err != nil {
  117. log.Error("process finished with status: %v", status.String())
  118. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  119. } else if status.ExitCode() != 0 {
  120. exit_string := status.String()
  121. if strings.Contains(exit_string, "bad system call") {
  122. s.WriteError([]byte("error: operation not permitted\n"))
  123. } else {
  124. s.WriteError([]byte(fmt.Sprintf("error: %v\n", exit_string)))
  125. }
  126. }
  127. // wait for the stdout and stderr to finish
  128. wg.Wait()
  129. stderr_reader.Close()
  130. stdout_reader.Close()
  131. if s.after_exit_hook != nil {
  132. s.after_exit_hook()
  133. }
  134. // stop the timer
  135. timer.Stop()
  136. s.done <- true
  137. }()
  138. return nil
  139. }
  140. func (s *OutputCaptureRunner) GetStdout() chan []byte {
  141. return s.stdout
  142. }
  143. func (s *OutputCaptureRunner) GetStderr() chan []byte {
  144. return s.stderr
  145. }
  146. func (s *OutputCaptureRunner) GetDone() chan bool {
  147. return s.done
  148. }