endpoint.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "sync/atomic"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  16. )
  17. func Endpoint(
  18. ctx *gin.Context,
  19. endpoint *models.Endpoint,
  20. path string,
  21. ) {
  22. req := ctx.Request.Clone(context.Background())
  23. req.URL.Path = path
  24. var buffer bytes.Buffer
  25. err := req.Write(&buffer)
  26. if err != nil {
  27. ctx.JSON(500, gin.H{"error": err.Error()})
  28. }
  29. // fetch plugin
  30. manager := plugin_manager.GetGlobalPluginManager()
  31. runtime := manager.Get(endpoint.PluginID)
  32. if runtime == nil {
  33. ctx.JSON(404, gin.H{"error": "plugin not found"})
  34. return
  35. }
  36. session := session_manager.NewSession(
  37. endpoint.TenantID,
  38. "",
  39. endpoint.PluginID,
  40. ctx.GetString("cluster_id"),
  41. access_types.PLUGIN_ACCESS_TYPE_Endpoint,
  42. access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  43. runtime.Configuration(),
  44. )
  45. defer session.Close()
  46. session.BindRuntime(runtime)
  47. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(session, &requests.RequestInvokeEndpoint{
  48. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  49. })
  50. if err != nil {
  51. ctx.JSON(500, gin.H{"error": err.Error()})
  52. return
  53. }
  54. defer response.Close()
  55. done := make(chan bool)
  56. closed := new(int32)
  57. ctx.Status(status_code)
  58. for k, v := range *headers {
  59. if len(v) > 0 {
  60. ctx.Writer.Header().Set(k, v[0])
  61. }
  62. }
  63. close := func() {
  64. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  65. close(done)
  66. }
  67. }
  68. defer close()
  69. routine.Submit(func() {
  70. defer close()
  71. for response.Next() {
  72. chunk, err := response.Read()
  73. if err != nil {
  74. ctx.JSON(500, gin.H{"error": err.Error()})
  75. return
  76. }
  77. ctx.Writer.Write(chunk)
  78. ctx.Writer.Flush()
  79. }
  80. })
  81. select {
  82. case <-ctx.Writer.CloseNotify():
  83. case <-done:
  84. case <-time.After(30 * time.Second):
  85. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  86. }
  87. }