endpoint.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "sync/atomic"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  16. )
  17. func Endpoint(ctx *gin.Context, endpoint *models.Endpoint, path string) {
  18. req := ctx.Request.Clone(context.Background())
  19. req.URL.Path = path
  20. var buffer bytes.Buffer
  21. err := req.Write(&buffer)
  22. if err != nil {
  23. ctx.JSON(500, gin.H{"error": err.Error()})
  24. }
  25. // fetch plugin
  26. manager := plugin_manager.GetGlobalPluginManager()
  27. runtime := manager.Get(endpoint.PluginID)
  28. if runtime == nil {
  29. ctx.JSON(404, gin.H{"error": "plugin not found"})
  30. return
  31. }
  32. session := session_manager.NewSession(
  33. endpoint.TenantID,
  34. "",
  35. endpoint.PluginID,
  36. ctx.GetString("cluster_id"),
  37. access_types.PLUGIN_ACCESS_TYPE_Endpoint,
  38. access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  39. runtime.Configuration(),
  40. )
  41. defer session.Close()
  42. session.BindRuntime(runtime)
  43. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(session, &requests.RequestInvokeEndpoint{
  44. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  45. })
  46. if err != nil {
  47. ctx.JSON(500, gin.H{"error": err.Error()})
  48. return
  49. }
  50. defer response.Close()
  51. done := make(chan bool)
  52. closed := new(int32)
  53. ctx.Status(status_code)
  54. for k, v := range *headers {
  55. if len(v) > 0 {
  56. ctx.Writer.Header().Set(k, v[0])
  57. }
  58. }
  59. close := func() {
  60. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  61. close(done)
  62. }
  63. }
  64. defer close()
  65. routine.Submit(func() {
  66. defer close()
  67. for response.Next() {
  68. chunk, err := response.Read()
  69. if err != nil {
  70. ctx.JSON(500, gin.H{"error": err.Error()})
  71. return
  72. }
  73. ctx.Writer.Write(chunk)
  74. ctx.Writer.Flush()
  75. }
  76. })
  77. select {
  78. case <-ctx.Writer.CloseNotify():
  79. case <-done:
  80. case <-time.After(30 * time.Second):
  81. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  82. }
  83. }