endpoint_service.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package plugin_daemon
  2. import (
  3. "encoding/hex"
  4. "net/http"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/endpoint_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. func InvokeEndpoint(
  12. session *session_manager.Session,
  13. request *requests.RequestInvokeEndpoint,
  14. ) (
  15. int, *http.Header, *stream.StreamResponse[[]byte], error,
  16. ) {
  17. resp, err := genericInvokePlugin[requests.RequestInvokeEndpoint, endpoint_entities.EndpointResponseChunk](
  18. session,
  19. request,
  20. 128,
  21. )
  22. if err != nil {
  23. return http.StatusInternalServerError, nil, nil, err
  24. }
  25. status_code := http.StatusContinue
  26. headers := &http.Header{}
  27. response := stream.NewStreamResponse[[]byte](128)
  28. response.OnClose(func() {
  29. // add close callback, ensure resources are released
  30. resp.Close()
  31. })
  32. for resp.Next() {
  33. result, err := resp.Read()
  34. if err != nil {
  35. resp.Close()
  36. return http.StatusInternalServerError, nil, nil, err
  37. }
  38. if result.Status != nil {
  39. status_code = int(*result.Status)
  40. }
  41. if result.Headers != nil {
  42. for k, v := range result.Headers {
  43. headers.Add(k, v)
  44. }
  45. }
  46. if result.Result != nil {
  47. dehexed, err := hex.DecodeString(*result.Result)
  48. if err != nil {
  49. resp.Close()
  50. return http.StatusInternalServerError, nil, nil, err
  51. }
  52. response.Write(dehexed)
  53. routine.Submit(func() {
  54. defer response.Close()
  55. for resp.Next() {
  56. chunk, err := resp.Read()
  57. if err != nil {
  58. return
  59. }
  60. dehexed, err := hex.DecodeString(*chunk.Result)
  61. if err != nil {
  62. return
  63. }
  64. response.Write(dehexed)
  65. }
  66. })
  67. break
  68. }
  69. }
  70. return status_code, headers, response, nil
  71. }