endpoint.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "sync/atomic"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  18. )
  19. func Endpoint(
  20. ctx *gin.Context,
  21. endpoint *models.Endpoint,
  22. plugin_installation *models.PluginInstallation,
  23. path string,
  24. ) {
  25. req := ctx.Request.Clone(context.Background())
  26. req.URL.Path = path
  27. var buffer bytes.Buffer
  28. err := req.Write(&buffer)
  29. if err != nil {
  30. ctx.JSON(500, gin.H{"error": err.Error()})
  31. }
  32. // fetch plugin
  33. manager := plugin_manager.GetGlobalPluginManager()
  34. runtime := manager.Get(
  35. plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier),
  36. )
  37. if runtime == nil {
  38. ctx.JSON(404, gin.H{"error": "plugin not found"})
  39. return
  40. }
  41. // fetch endpoint declaration
  42. endpoint_declaration := runtime.Configuration().Endpoint
  43. if endpoint_declaration == nil {
  44. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  45. return
  46. }
  47. // decrypt settings
  48. settings, err := dify_invocation.InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  49. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  50. TenantId: endpoint.TenantID,
  51. UserId: "",
  52. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  53. },
  54. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  55. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  56. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  57. Identity: endpoint.ID,
  58. Data: endpoint.GetSettings(),
  59. Config: endpoint_declaration.Settings,
  60. },
  61. })
  62. if err != nil {
  63. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  64. return
  65. }
  66. session := session_manager.NewSession(
  67. endpoint.TenantID,
  68. "",
  69. plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier),
  70. ctx.GetString("cluster_id"),
  71. access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  72. access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  73. runtime.Configuration(),
  74. )
  75. defer session.Close()
  76. session.BindRuntime(runtime)
  77. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(
  78. session, &requests.RequestInvokeEndpoint{
  79. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  80. Settings: settings,
  81. },
  82. )
  83. if err != nil {
  84. ctx.JSON(500, gin.H{"error": err.Error()})
  85. return
  86. }
  87. defer response.Close()
  88. done := make(chan bool)
  89. closed := new(int32)
  90. ctx.Status(status_code)
  91. for k, v := range *headers {
  92. if len(v) > 0 {
  93. ctx.Writer.Header().Set(k, v[0])
  94. }
  95. }
  96. close := func() {
  97. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  98. close(done)
  99. }
  100. }
  101. defer close()
  102. routine.Submit(func() {
  103. defer close()
  104. for response.Next() {
  105. chunk, err := response.Read()
  106. if err != nil {
  107. ctx.JSON(500, gin.H{"error": err.Error()})
  108. return
  109. }
  110. ctx.Writer.Write(chunk)
  111. ctx.Writer.Flush()
  112. }
  113. })
  114. select {
  115. case <-ctx.Writer.CloseNotify():
  116. case <-done:
  117. case <-time.After(30 * time.Second):
  118. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  119. }
  120. }