webhook_service.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package plugin_daemon
  2. import (
  3. "encoding/hex"
  4. "net/http"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/webhook_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  11. )
  12. func InvokeWebhook(
  13. session *session_manager.Session,
  14. request *requests.RequestInvokeWebhook,
  15. ) (
  16. int, *http.Header, *stream.StreamResponse[[]byte], error,
  17. ) {
  18. resp, err := genericInvokePlugin[requests.RequestInvokeWebhook, webhook_entities.WebhookResponseChunk](
  19. session,
  20. request,
  21. 128,
  22. access_types.PLUGIN_ACCESS_TYPE_WEBHOOK,
  23. access_types.PLUGIN_ACCESS_ACTION_WEBHOOK,
  24. )
  25. if err != nil {
  26. return http.StatusInternalServerError, nil, nil, err
  27. }
  28. status_code := http.StatusContinue
  29. headers := &http.Header{}
  30. response := stream.NewStreamResponse[[]byte](128)
  31. response.OnClose(func() {
  32. // add close callback, ensure resources are released
  33. resp.Close()
  34. })
  35. for resp.Next() {
  36. result, err := resp.Read()
  37. if err != nil {
  38. resp.Close()
  39. return http.StatusInternalServerError, nil, nil, err
  40. }
  41. if result.Status != nil {
  42. status_code = int(*result.Status)
  43. }
  44. if result.Headers != nil {
  45. for k, v := range result.Headers {
  46. headers.Add(k, v)
  47. }
  48. }
  49. if result.Result != nil {
  50. dehexed, err := hex.DecodeString(*result.Result)
  51. if err != nil {
  52. resp.Close()
  53. return http.StatusInternalServerError, nil, nil, err
  54. }
  55. response.Write(dehexed)
  56. routine.Submit(func() {
  57. for resp.Next() {
  58. chunk, err := resp.Read()
  59. if err != nil {
  60. return
  61. }
  62. dehexed, err := hex.DecodeString(*chunk.Result)
  63. if err != nil {
  64. return
  65. }
  66. response.Write(dehexed)
  67. }
  68. })
  69. break
  70. }
  71. }
  72. return status_code, headers, response, nil
  73. }