python.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package python
  2. import (
  3. "context"
  4. _ "embed"
  5. "fmt"
  6. "io"
  7. "os"
  8. "os/exec"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/google/uuid"
  14. "github.com/langgenius/dify-sandbox/internal/core/runner"
  15. "github.com/langgenius/dify-sandbox/internal/static"
  16. "github.com/langgenius/dify-sandbox/internal/utils/log"
  17. )
  18. type PythonRunner struct {
  19. runner.Runner
  20. runner.SeccompRunner
  21. }
  22. //go:embed prescript.py
  23. var python_sandbox_fs []byte
  24. //go:embed python.so
  25. var python_lib []byte
  26. func init() {
  27. // check if libpython.so exists
  28. log.Info("initializing python runner environment...")
  29. if _, err := os.Stat("/tmp/sandbox-python/python.so"); os.IsNotExist(err) {
  30. err := os.MkdirAll("/tmp/sandbox-python", 0755)
  31. if err != nil {
  32. log.Panic("failed to create /tmp/sandbox-python")
  33. }
  34. err = os.WriteFile("/tmp/sandbox-python/python.so", python_lib, 0755)
  35. if err != nil {
  36. log.Panic("failed to write /tmp/sandbox-python/python.so")
  37. }
  38. }
  39. }
  40. func (p *PythonRunner) Run(code string, timeout time.Duration, stdin []byte) (chan []byte, chan []byte, chan bool, error) {
  41. // create a tmp dir and copy the python script
  42. temp_code_name := strings.ReplaceAll(uuid.New().String(), "-", "_")
  43. temp_code_name = strings.ReplaceAll(temp_code_name, "/", ".")
  44. temp_code_path := fmt.Sprintf("/tmp/code/%s.py", temp_code_name)
  45. err := os.MkdirAll("/tmp/code", 0755)
  46. if err != nil {
  47. return nil, nil, nil, err
  48. }
  49. err = os.WriteFile(temp_code_path, []byte(code), 0755)
  50. if err != nil {
  51. return nil, nil, nil, err
  52. }
  53. stdout := make(chan []byte)
  54. stderr := make(chan []byte)
  55. done_chan := make(chan bool)
  56. err = p.WithTempDir([]string{
  57. temp_code_path,
  58. "/tmp/sandbox-python/python.so",
  59. }, func(root_path string) error {
  60. // create a new process
  61. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  62. cmd := exec.CommandContext(ctx,
  63. "/usr/bin/python3",
  64. "-c",
  65. string(python_sandbox_fs),
  66. temp_code_path,
  67. strconv.Itoa(static.SANDBOX_USER_UID),
  68. strconv.Itoa(static.SANDBOX_GROUP_ID),
  69. )
  70. cmd.Env = []string{}
  71. // create a pipe for the stdout
  72. stdout_reader, err := cmd.StdoutPipe()
  73. if err != nil {
  74. cancel()
  75. return err
  76. }
  77. // create a pipe for the stderr
  78. stderr_reader, err := cmd.StderrPipe()
  79. if err != nil {
  80. cancel()
  81. return err
  82. }
  83. // start the process
  84. err = cmd.Start()
  85. if err != nil {
  86. cancel()
  87. return err
  88. }
  89. wg := sync.WaitGroup{}
  90. wg.Add(2)
  91. // read the output
  92. go func() {
  93. buf := make([]byte, 1024)
  94. defer wg.Done()
  95. for {
  96. n, err := stdout_reader.Read(buf)
  97. // exit if EOF
  98. if err != nil {
  99. if err == io.EOF {
  100. break
  101. } else {
  102. stderr <- []byte(fmt.Sprintf("error: %v\n", err))
  103. break
  104. }
  105. }
  106. stdout <- buf[:n]
  107. }
  108. }()
  109. // read the error
  110. go func() {
  111. buf := make([]byte, 1024)
  112. defer wg.Done()
  113. for {
  114. n, err := stderr_reader.Read(buf)
  115. // exit if EOF
  116. if err != nil {
  117. if err == io.EOF {
  118. break
  119. } else {
  120. stderr <- []byte(fmt.Sprintf("error: %v\n", err))
  121. break
  122. }
  123. }
  124. stderr <- buf[:n]
  125. }
  126. }()
  127. // wait for the process to finish
  128. go func() {
  129. status, err := cmd.Process.Wait()
  130. if err != nil {
  131. log.Error("process finished with status: %v", status.String())
  132. stderr <- []byte(fmt.Sprintf("error: %v\n", err))
  133. } else if status.ExitCode() != 0 {
  134. exit_string := status.String()
  135. if strings.Contains(exit_string, "bad system call (core dumped)") {
  136. stderr <- []byte("error: operation not permitted\n")
  137. } else {
  138. stderr <- []byte(fmt.Sprintf("exit code: %v\n", status.ExitCode()))
  139. }
  140. }
  141. // wait for the stdout and stderr to finish
  142. wg.Wait()
  143. stderr_reader.Close()
  144. stdout_reader.Close()
  145. os.Remove(temp_code_path)
  146. os.RemoveAll(root_path)
  147. os.Remove(root_path)
  148. cancel()
  149. done_chan <- true
  150. }()
  151. return nil
  152. })
  153. if err != nil {
  154. return nil, nil, nil, err
  155. }
  156. return stdout, stderr, done_chan, nil
  157. }