endpoint_service.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package plugin_daemon
  2. import (
  3. "encoding/hex"
  4. "net/http"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/endpoint_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. func InvokeEndpoint(
  12. session *session_manager.Session,
  13. request *requests.RequestInvokeEndpoint,
  14. ) (
  15. int, *http.Header, *stream.Stream[[]byte], error,
  16. ) {
  17. resp, err := GenericInvokePlugin[requests.RequestInvokeEndpoint, endpoint_entities.EndpointResponseChunk](
  18. session,
  19. request,
  20. 128,
  21. )
  22. if err != nil {
  23. return http.StatusInternalServerError, nil, nil, err
  24. }
  25. statusCode := http.StatusContinue
  26. headers := &http.Header{}
  27. response := stream.NewStream[[]byte](128)
  28. response.OnClose(func() {
  29. // add close callback, ensure resources are released
  30. resp.Close()
  31. })
  32. for resp.Next() {
  33. result, err := resp.Read()
  34. if err != nil {
  35. response.Close()
  36. return http.StatusInternalServerError, nil, nil, err
  37. }
  38. if result.Status != nil {
  39. statusCode = int(*result.Status)
  40. }
  41. if result.Headers != nil {
  42. for k, v := range result.Headers {
  43. headers.Add(k, v)
  44. }
  45. }
  46. if result.Result != nil {
  47. dehexed, err := hex.DecodeString(*result.Result)
  48. if err != nil {
  49. response.Close()
  50. return http.StatusInternalServerError, nil, nil, err
  51. }
  52. response.Write(dehexed)
  53. routine.Submit(map[string]string{
  54. "module": "plugin_daemon",
  55. "function": "InvokeEndpoint",
  56. "type": "body_write",
  57. }, func() {
  58. defer response.Close()
  59. for resp.Next() {
  60. chunk, err := resp.Read()
  61. if err != nil {
  62. return
  63. }
  64. dehexed, err := hex.DecodeString(*chunk.Result)
  65. if err != nil {
  66. return
  67. }
  68. response.Write(dehexed)
  69. }
  70. })
  71. break
  72. }
  73. }
  74. if resp.IsClosed() {
  75. response.Close()
  76. }
  77. return statusCode, headers, response, nil
  78. }