webhook.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package service
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "sync/atomic"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. )
  15. func Webhook(ctx *gin.Context, webhook *models.Webhook, path string) {
  16. req := ctx.Request
  17. var buffer bytes.Buffer
  18. err := req.Write(&buffer)
  19. if err != nil {
  20. ctx.JSON(500, gin.H{"error": err.Error()})
  21. }
  22. // fetch plugin
  23. manager := plugin_manager.GetGlobalPluginManager()
  24. runtime := manager.Get(webhook.PluginID)
  25. if runtime == nil {
  26. ctx.JSON(404, gin.H{"error": "plugin not found"})
  27. return
  28. }
  29. session := session_manager.NewSession(webhook.TenantID, "", webhook.PluginID)
  30. defer session.Close()
  31. session.BindRuntime(runtime)
  32. status_code, headers, response, err := plugin_daemon.InvokeWebhook(session, &requests.RequestInvokeWebhook{
  33. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  34. })
  35. if err != nil {
  36. ctx.JSON(500, gin.H{"error": err.Error()})
  37. return
  38. }
  39. defer response.Close()
  40. done := make(chan bool)
  41. closed := new(int32)
  42. ctx.Status(status_code)
  43. for k, v := range *headers {
  44. if len(v) > 0 {
  45. ctx.Writer.Header().Set(k, v[0])
  46. }
  47. }
  48. close := func() {
  49. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  50. close(done)
  51. }
  52. }
  53. defer close()
  54. routine.Submit(func() {
  55. defer close()
  56. for response.Next() {
  57. chunk, err := response.Read()
  58. if err != nil {
  59. ctx.JSON(500, gin.H{"error": err.Error()})
  60. return
  61. }
  62. ctx.Writer.Write(chunk)
  63. }
  64. })
  65. select {
  66. case <-ctx.Writer.CloseNotify():
  67. case <-done:
  68. case <-time.After(30 * time.Second):
  69. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  70. }
  71. }