webhook.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "sync/atomic"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  13. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  15. )
  16. func Webhook(ctx *gin.Context, webhook *models.Webhook, path string) {
  17. req := ctx.Request.Clone(context.Background())
  18. req.URL.Path = path
  19. var buffer bytes.Buffer
  20. err := req.Write(&buffer)
  21. if err != nil {
  22. ctx.JSON(500, gin.H{"error": err.Error()})
  23. }
  24. // fetch plugin
  25. manager := plugin_manager.GetGlobalPluginManager()
  26. runtime := manager.Get(webhook.PluginID)
  27. if runtime == nil {
  28. ctx.JSON(404, gin.H{"error": "plugin not found"})
  29. return
  30. }
  31. session := session_manager.NewSession(webhook.TenantID, "", webhook.PluginID, ctx.GetString("cluster_id"))
  32. defer session.Close()
  33. session.BindRuntime(runtime)
  34. status_code, headers, response, err := plugin_daemon.InvokeWebhook(session, &requests.RequestInvokeWebhook{
  35. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  36. })
  37. if err != nil {
  38. ctx.JSON(500, gin.H{"error": err.Error()})
  39. return
  40. }
  41. defer response.Close()
  42. done := make(chan bool)
  43. closed := new(int32)
  44. ctx.Status(status_code)
  45. for k, v := range *headers {
  46. if len(v) > 0 {
  47. ctx.Writer.Header().Set(k, v[0])
  48. }
  49. }
  50. close := func() {
  51. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  52. close(done)
  53. }
  54. }
  55. defer close()
  56. routine.Submit(func() {
  57. defer close()
  58. for response.Next() {
  59. chunk, err := response.Read()
  60. if err != nil {
  61. ctx.JSON(500, gin.H{"error": err.Error()})
  62. return
  63. }
  64. ctx.Writer.Write(chunk)
  65. ctx.Writer.Flush()
  66. }
  67. })
  68. select {
  69. case <-ctx.Writer.CloseNotify():
  70. case <-done:
  71. case <-time.After(30 * time.Second):
  72. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  73. }
  74. }