runner.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package service
  2. import (
  3. "sync/atomic"
  4. "time"
  5. "github.com/gin-gonic/gin"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. // baseSSEService is a helper function to handle SSE service
  12. // it accepts a generator function that returns a stream response to gin context
  13. func baseSSEService[R any](
  14. generator func() (*stream.Stream[R], error),
  15. ctx *gin.Context,
  16. max_timeout_seconds int,
  17. ) {
  18. writer := ctx.Writer
  19. writer.WriteHeader(200)
  20. writer.Header().Set("Content-Type", "text/event-stream")
  21. done := make(chan bool)
  22. done_closed := new(int32)
  23. closed := new(int32)
  24. write_data := func(data interface{}) {
  25. if atomic.LoadInt32(closed) == 1 {
  26. return
  27. }
  28. writer.Write([]byte("data: "))
  29. writer.Write(parser.MarshalJsonBytes(data))
  30. writer.Write([]byte("\n\n"))
  31. writer.Flush()
  32. }
  33. plugin_daemon_response, err := generator()
  34. if err != nil {
  35. write_data(entities.NewErrorResponse(-500, err.Error()))
  36. close(done)
  37. return
  38. }
  39. routine.Submit(func() {
  40. for plugin_daemon_response.Next() {
  41. chunk, err := plugin_daemon_response.Read()
  42. if err != nil {
  43. write_data(entities.NewErrorResponse(-500, err.Error()))
  44. break
  45. }
  46. write_data(entities.NewSuccessResponse(chunk))
  47. }
  48. if atomic.CompareAndSwapInt32(done_closed, 0, 1) {
  49. close(done)
  50. }
  51. })
  52. timer := time.NewTimer(time.Duration(max_timeout_seconds) * time.Second)
  53. defer timer.Stop()
  54. defer func() {
  55. atomic.StoreInt32(closed, 1)
  56. }()
  57. select {
  58. case <-writer.CloseNotify():
  59. plugin_daemon_response.Close()
  60. return
  61. case <-done:
  62. return
  63. case <-timer.C:
  64. write_data(entities.NewErrorResponse(-500, "killed by timeout"))
  65. if atomic.CompareAndSwapInt32(done_closed, 0, 1) {
  66. close(done)
  67. }
  68. return
  69. }
  70. }