endpoint.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "sync/atomic"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/db"
  15. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  20. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  21. )
  22. func Endpoint(
  23. ctx *gin.Context,
  24. endpoint *models.Endpoint,
  25. plugin_installation *models.PluginInstallation,
  26. path string,
  27. ) {
  28. req := ctx.Request.Clone(context.Background())
  29. req.URL.Path = path
  30. var buffer bytes.Buffer
  31. err := req.Write(&buffer)
  32. if err != nil {
  33. ctx.JSON(500, gin.H{"error": err.Error()})
  34. }
  35. identifier, err := plugin_entities.NewPluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier)
  36. if err != nil {
  37. ctx.JSON(400, gin.H{"error": "Invalid plugin identifier, " + err.Error()})
  38. return
  39. }
  40. // fetch plugin
  41. manager := plugin_manager.Manager()
  42. runtime := manager.Get(identifier)
  43. if runtime == nil {
  44. ctx.JSON(404, gin.H{"error": "plugin not found"})
  45. return
  46. }
  47. // fetch endpoint declaration
  48. endpoint_declaration := runtime.Configuration().Endpoint
  49. if endpoint_declaration == nil {
  50. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  51. return
  52. }
  53. // decrypt settings
  54. settings, err := dify_invocation.InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  55. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  56. TenantId: endpoint.TenantID,
  57. UserId: "",
  58. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  59. },
  60. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  61. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  62. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  63. Identity: endpoint.ID,
  64. Data: endpoint.GetSettings(),
  65. Config: endpoint_declaration.Settings,
  66. },
  67. })
  68. if err != nil {
  69. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  70. return
  71. }
  72. session := session_manager.NewSession(
  73. endpoint.TenantID,
  74. "",
  75. identifier,
  76. ctx.GetString("cluster_id"),
  77. access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  78. access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  79. runtime.Configuration(),
  80. )
  81. defer session.Close()
  82. session.BindRuntime(runtime)
  83. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(
  84. session, &requests.RequestInvokeEndpoint{
  85. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  86. Settings: settings,
  87. },
  88. )
  89. if err != nil {
  90. ctx.JSON(500, gin.H{"error": err.Error()})
  91. return
  92. }
  93. defer response.Close()
  94. done := make(chan bool)
  95. closed := new(int32)
  96. ctx.Status(status_code)
  97. for k, v := range *headers {
  98. if len(v) > 0 {
  99. ctx.Writer.Header().Set(k, v[0])
  100. }
  101. }
  102. close := func() {
  103. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  104. close(done)
  105. }
  106. }
  107. defer close()
  108. routine.Submit(func() {
  109. defer close()
  110. for response.Next() {
  111. chunk, err := response.Read()
  112. if err != nil {
  113. ctx.JSON(500, gin.H{"error": err.Error()})
  114. return
  115. }
  116. ctx.Writer.Write(chunk)
  117. ctx.Writer.Flush()
  118. }
  119. })
  120. select {
  121. case <-ctx.Writer.CloseNotify():
  122. case <-done:
  123. case <-time.After(30 * time.Second):
  124. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  125. }
  126. }
  127. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  128. endpoint, err := db.GetOne[models.Endpoint](
  129. db.Equal("id", endpoint_id),
  130. db.Equal("tenant_id", tenant_id),
  131. )
  132. if err != nil {
  133. return entities.NewErrorResponse(-404, "Endpoint not found")
  134. }
  135. endpoint.Enabled = true
  136. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  137. return entities.NewErrorResponse(-500, "Failed to enable endpoint")
  138. }
  139. return entities.NewSuccessResponse("success")
  140. }
  141. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  142. endpoint, err := db.GetOne[models.Endpoint](
  143. db.Equal("id", endpoint_id),
  144. db.Equal("tenant_id", tenant_id),
  145. )
  146. if err != nil {
  147. return entities.NewErrorResponse(-404, "Endpoint not found")
  148. }
  149. endpoint.Enabled = false
  150. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  151. return entities.NewErrorResponse(-500, "Failed to disable endpoint")
  152. }
  153. return entities.NewSuccessResponse("success")
  154. }