output_capture.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package runner
  2. import (
  3. "fmt"
  4. "io"
  5. "os/exec"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/langgenius/dify-sandbox/internal/utils/log"
  10. )
  11. type OutputCaptureRunner struct {
  12. stdout chan []byte
  13. stderr chan []byte
  14. done chan bool
  15. timeout time.Duration
  16. after_exit_hook func()
  17. }
  18. func NewOutputCaptureRunner() *OutputCaptureRunner {
  19. return &OutputCaptureRunner{
  20. stdout: make(chan []byte),
  21. stderr: make(chan []byte),
  22. done: make(chan bool),
  23. }
  24. }
  25. func (s *OutputCaptureRunner) WriteError(data []byte) {
  26. if s.stderr != nil {
  27. s.stderr <- data
  28. }
  29. }
  30. func (s *OutputCaptureRunner) WriteOutput(data []byte) {
  31. if s.stdout != nil {
  32. s.stdout <- data
  33. }
  34. }
  35. func (s *OutputCaptureRunner) SetAfterExitHook(hook func()) {
  36. s.after_exit_hook = hook
  37. }
  38. func (s *OutputCaptureRunner) SetTimeout(timeout time.Duration) {
  39. s.timeout = timeout
  40. }
  41. func (s *OutputCaptureRunner) CaptureOutput(cmd *exec.Cmd) error {
  42. // start a timer for the timeout
  43. timeout := s.timeout
  44. if timeout == 0 {
  45. timeout = 5 * time.Second
  46. }
  47. timer := time.NewTimer(timeout)
  48. go func() {
  49. <-timer.C
  50. if cmd != nil && cmd.Process != nil {
  51. // write the error
  52. s.WriteError([]byte("error: timeout\n"))
  53. // send a signal to the process
  54. cmd.Process.Kill()
  55. }
  56. }()
  57. // create a pipe for the stdout
  58. stdout_reader, err := cmd.StdoutPipe()
  59. if err != nil {
  60. return err
  61. }
  62. // create a pipe for the stderr
  63. stderr_reader, err := cmd.StderrPipe()
  64. if err != nil {
  65. stdout_reader.Close()
  66. return err
  67. }
  68. // start the process
  69. err = cmd.Start()
  70. if err != nil {
  71. stdout_reader.Close()
  72. stderr_reader.Close()
  73. return err
  74. }
  75. wg := sync.WaitGroup{}
  76. wg.Add(2)
  77. written := 0
  78. // read the output
  79. go func() {
  80. defer wg.Done()
  81. for {
  82. buf := make([]byte, 1024)
  83. n, err := stdout_reader.Read(buf)
  84. // exit if EOF
  85. if err != nil {
  86. if err == io.EOF {
  87. break
  88. } else {
  89. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  90. break
  91. }
  92. }
  93. written += n
  94. s.WriteOutput(buf[:n])
  95. }
  96. }()
  97. // read the error
  98. go func() {
  99. buf := make([]byte, 1024)
  100. defer wg.Done()
  101. for {
  102. n, err := stderr_reader.Read(buf)
  103. // exit if EOF
  104. if err != nil {
  105. if err == io.EOF {
  106. break
  107. } else {
  108. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  109. break
  110. }
  111. }
  112. s.WriteError(buf[:n])
  113. }
  114. }()
  115. // wait for the process to finish
  116. go func() {
  117. // wait for the stdout and stderr to finish
  118. wg.Wait()
  119. // wait for the process to finish
  120. status, err := cmd.Process.Wait()
  121. if err != nil {
  122. log.Error("process finished with status: %v", status.String())
  123. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  124. } else if status.ExitCode() != 0 {
  125. exit_string := status.String()
  126. if strings.Contains(exit_string, "bad system call") {
  127. s.WriteError([]byte("error: operation not permitted\n"))
  128. } else {
  129. s.WriteError([]byte(fmt.Sprintf("error: %v\n", exit_string)))
  130. }
  131. }
  132. if s.after_exit_hook != nil {
  133. s.after_exit_hook()
  134. }
  135. // stop the timer
  136. timer.Stop()
  137. s.done <- true
  138. }()
  139. return nil
  140. }
  141. func (s *OutputCaptureRunner) GetStdout() chan []byte {
  142. return s.stdout
  143. }
  144. func (s *OutputCaptureRunner) GetStderr() chan []byte {
  145. return s.stderr
  146. }
  147. func (s *OutputCaptureRunner) GetDone() chan bool {
  148. return s.done
  149. }