endpoint_service.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package plugin_daemon
  2. import (
  3. "encoding/hex"
  4. "net/http"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  8. "github.com/langgenius/dify-plugin-daemon/pkg/entities/endpoint_entities"
  9. "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
  10. )
  11. func InvokeEndpoint(
  12. session *session_manager.Session,
  13. request *requests.RequestInvokeEndpoint,
  14. ) (
  15. int, *http.Header, *stream.Stream[[]byte], error,
  16. ) {
  17. resp, err := GenericInvokePlugin[requests.RequestInvokeEndpoint, endpoint_entities.EndpointResponseChunk](
  18. session,
  19. request,
  20. 128,
  21. )
  22. if err != nil {
  23. return http.StatusInternalServerError, nil, nil, err
  24. }
  25. statusCode := http.StatusContinue
  26. headers := &http.Header{}
  27. response := stream.NewStream[[]byte](128)
  28. response.OnClose(func() {
  29. // add close callback, ensure resources are released
  30. resp.Close()
  31. })
  32. for resp.Next() {
  33. result, err := resp.Read()
  34. if err != nil {
  35. response.Close()
  36. return http.StatusInternalServerError, nil, nil, err
  37. }
  38. if result.Status != nil {
  39. statusCode = int(*result.Status)
  40. }
  41. if result.Headers != nil {
  42. for k, v := range result.Headers {
  43. headers.Add(k, v)
  44. }
  45. }
  46. if result.Result != nil {
  47. dehexed, err := hex.DecodeString(*result.Result)
  48. if err != nil {
  49. response.Close()
  50. return http.StatusInternalServerError, nil, nil, err
  51. }
  52. response.Write(dehexed)
  53. routine.Submit(map[string]string{
  54. "module": "plugin_daemon",
  55. "function": "InvokeEndpoint",
  56. "type": "body_write",
  57. }, func() {
  58. defer response.Close()
  59. for resp.Next() {
  60. chunk, err := resp.Read()
  61. if err != nil {
  62. response.WriteError(err)
  63. return
  64. }
  65. dehexed, err := hex.DecodeString(*chunk.Result)
  66. if err != nil {
  67. return
  68. }
  69. response.Write(dehexed)
  70. }
  71. })
  72. break
  73. }
  74. }
  75. if resp.IsClosed() {
  76. response.Close()
  77. }
  78. return statusCode, headers, response, nil
  79. }