endpoint.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "sync/atomic"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  16. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  17. )
  18. func Endpoint(
  19. ctx *gin.Context,
  20. endpoint *models.Endpoint,
  21. path string,
  22. ) {
  23. req := ctx.Request.Clone(context.Background())
  24. req.URL.Path = path
  25. var buffer bytes.Buffer
  26. err := req.Write(&buffer)
  27. if err != nil {
  28. ctx.JSON(500, gin.H{"error": err.Error()})
  29. }
  30. // fetch plugin
  31. manager := plugin_manager.GetGlobalPluginManager()
  32. runtime := manager.Get(endpoint.PluginID)
  33. if runtime == nil {
  34. ctx.JSON(404, gin.H{"error": "plugin not found"})
  35. return
  36. }
  37. persistence := persistence.GetPersistence()
  38. if persistence == nil {
  39. ctx.JSON(500, gin.H{"error": "persistence not found"})
  40. return
  41. }
  42. session := session_manager.NewSession(
  43. endpoint.TenantID,
  44. "",
  45. endpoint.PluginID,
  46. ctx.GetString("cluster_id"),
  47. access_types.PLUGIN_ACCESS_TYPE_Endpoint,
  48. access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  49. runtime.Configuration(),
  50. persistence,
  51. )
  52. defer session.Close()
  53. session.BindRuntime(runtime)
  54. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(session, &requests.RequestInvokeEndpoint{
  55. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  56. })
  57. if err != nil {
  58. ctx.JSON(500, gin.H{"error": err.Error()})
  59. return
  60. }
  61. defer response.Close()
  62. done := make(chan bool)
  63. closed := new(int32)
  64. ctx.Status(status_code)
  65. for k, v := range *headers {
  66. if len(v) > 0 {
  67. ctx.Writer.Header().Set(k, v[0])
  68. }
  69. }
  70. close := func() {
  71. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  72. close(done)
  73. }
  74. }
  75. defer close()
  76. routine.Submit(func() {
  77. defer close()
  78. for response.Next() {
  79. chunk, err := response.Read()
  80. if err != nil {
  81. ctx.JSON(500, gin.H{"error": err.Error()})
  82. return
  83. }
  84. ctx.Writer.Write(chunk)
  85. ctx.Writer.Flush()
  86. }
  87. })
  88. select {
  89. case <-ctx.Writer.CloseNotify():
  90. case <-done:
  91. case <-time.After(30 * time.Second):
  92. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  93. }
  94. }