output_capture.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package runner
  2. import (
  3. "fmt"
  4. "io"
  5. "os/exec"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/langgenius/dify-sandbox/internal/utils/log"
  10. )
  11. type OutputCaptureRunner struct {
  12. stdout chan []byte
  13. stderr chan []byte
  14. done chan bool
  15. timeout time.Duration
  16. after_exit_hook func()
  17. }
  18. func NewOutputCaptureRunner() *OutputCaptureRunner {
  19. return &OutputCaptureRunner{
  20. stdout: make(chan []byte),
  21. stderr: make(chan []byte),
  22. done: make(chan bool),
  23. }
  24. }
  25. func (s *OutputCaptureRunner) WriteError(data []byte) {
  26. if s.stderr != nil {
  27. s.stderr <- data
  28. }
  29. }
  30. func (s *OutputCaptureRunner) WriteOutput(data []byte) {
  31. if s.stdout != nil {
  32. s.stdout <- data
  33. }
  34. }
  35. func (s *OutputCaptureRunner) SetAfterExitHook(hook func()) {
  36. s.after_exit_hook = hook
  37. }
  38. func (s *OutputCaptureRunner) SetTimeout(timeout time.Duration) {
  39. s.timeout = timeout
  40. }
  41. func (s *OutputCaptureRunner) CaptureOutput(cmd *exec.Cmd) error {
  42. // start a timer for the timeout
  43. timeout := s.timeout
  44. if timeout == 0 {
  45. timeout = 5 * time.Second
  46. }
  47. timer := time.AfterFunc(timeout, func() {
  48. if cmd != nil && cmd.Process != nil {
  49. // write the error
  50. s.WriteError([]byte("error: timeout\n"))
  51. // send a signal to the process
  52. cmd.Process.Kill()
  53. }
  54. })
  55. // create a pipe for the stdout
  56. stdout_reader, err := cmd.StdoutPipe()
  57. if err != nil {
  58. return err
  59. }
  60. // create a pipe for the stderr
  61. stderr_reader, err := cmd.StderrPipe()
  62. if err != nil {
  63. stdout_reader.Close()
  64. return err
  65. }
  66. // start the process
  67. err = cmd.Start()
  68. if err != nil {
  69. stdout_reader.Close()
  70. stderr_reader.Close()
  71. return err
  72. }
  73. wg := sync.WaitGroup{}
  74. wg.Add(2)
  75. written := 0
  76. // read the output
  77. go func() {
  78. defer wg.Done()
  79. for {
  80. buf := make([]byte, 1024)
  81. n, err := stdout_reader.Read(buf)
  82. // exit if EOF
  83. if err != nil {
  84. if err == io.EOF {
  85. break
  86. } else {
  87. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  88. break
  89. }
  90. }
  91. written += n
  92. s.WriteOutput(buf[:n])
  93. }
  94. }()
  95. // read the error
  96. go func() {
  97. buf := make([]byte, 1024)
  98. defer wg.Done()
  99. for {
  100. n, err := stderr_reader.Read(buf)
  101. // exit if EOF
  102. if err != nil {
  103. if err == io.EOF {
  104. break
  105. } else {
  106. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  107. break
  108. }
  109. }
  110. s.WriteError(buf[:n])
  111. }
  112. }()
  113. // wait for the process to finish
  114. go func() {
  115. // wait for the stdout and stderr to finish
  116. wg.Wait()
  117. // wait for the process to finish
  118. status, err := cmd.Process.Wait()
  119. if err != nil {
  120. log.Error("process finished with status: %v", status.String())
  121. s.WriteError([]byte(fmt.Sprintf("error: %v\n", err)))
  122. } else if status.ExitCode() != 0 {
  123. exit_string := status.String()
  124. if strings.Contains(exit_string, "bad system call") {
  125. s.WriteError([]byte("error: operation not permitted\n"))
  126. } else {
  127. s.WriteError([]byte(fmt.Sprintf("error: %v\n", exit_string)))
  128. }
  129. }
  130. if s.after_exit_hook != nil {
  131. s.after_exit_hook()
  132. }
  133. // stop the timer
  134. timer.Stop()
  135. s.done <- true
  136. }()
  137. return nil
  138. }
  139. func (s *OutputCaptureRunner) GetStdout() chan []byte {
  140. return s.stdout
  141. }
  142. func (s *OutputCaptureRunner) GetStderr() chan []byte {
  143. return s.stderr
  144. }
  145. func (s *OutputCaptureRunner) GetDone() chan bool {
  146. return s.done
  147. }