endpoint.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "sync/atomic"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/db"
  15. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  20. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  21. )
  22. func Endpoint(
  23. ctx *gin.Context,
  24. endpoint *models.Endpoint,
  25. plugin_installation *models.PluginInstallation,
  26. path string,
  27. ) {
  28. req := ctx.Request.Clone(context.Background())
  29. req.URL.Path = path
  30. var buffer bytes.Buffer
  31. err := req.Write(&buffer)
  32. if err != nil {
  33. ctx.JSON(500, gin.H{"error": err.Error()})
  34. }
  35. // fetch plugin
  36. manager := plugin_manager.Manager()
  37. runtime := manager.Get(
  38. plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier),
  39. )
  40. if runtime == nil {
  41. ctx.JSON(404, gin.H{"error": "plugin not found"})
  42. return
  43. }
  44. // fetch endpoint declaration
  45. endpoint_declaration := runtime.Configuration().Endpoint
  46. if endpoint_declaration == nil {
  47. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  48. return
  49. }
  50. // decrypt settings
  51. settings, err := dify_invocation.InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  52. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  53. TenantId: endpoint.TenantID,
  54. UserId: "",
  55. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  56. },
  57. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  58. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  59. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  60. Identity: endpoint.ID,
  61. Data: endpoint.GetSettings(),
  62. Config: endpoint_declaration.Settings,
  63. },
  64. })
  65. if err != nil {
  66. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  67. return
  68. }
  69. session := session_manager.NewSession(
  70. endpoint.TenantID,
  71. "",
  72. plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier),
  73. ctx.GetString("cluster_id"),
  74. access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  75. access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  76. runtime.Configuration(),
  77. )
  78. defer session.Close()
  79. session.BindRuntime(runtime)
  80. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(
  81. session, &requests.RequestInvokeEndpoint{
  82. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  83. Settings: settings,
  84. },
  85. )
  86. if err != nil {
  87. ctx.JSON(500, gin.H{"error": err.Error()})
  88. return
  89. }
  90. defer response.Close()
  91. done := make(chan bool)
  92. closed := new(int32)
  93. ctx.Status(status_code)
  94. for k, v := range *headers {
  95. if len(v) > 0 {
  96. ctx.Writer.Header().Set(k, v[0])
  97. }
  98. }
  99. close := func() {
  100. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  101. close(done)
  102. }
  103. }
  104. defer close()
  105. routine.Submit(func() {
  106. defer close()
  107. for response.Next() {
  108. chunk, err := response.Read()
  109. if err != nil {
  110. ctx.JSON(500, gin.H{"error": err.Error()})
  111. return
  112. }
  113. ctx.Writer.Write(chunk)
  114. ctx.Writer.Flush()
  115. }
  116. })
  117. select {
  118. case <-ctx.Writer.CloseNotify():
  119. case <-done:
  120. case <-time.After(30 * time.Second):
  121. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  122. }
  123. }
  124. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  125. endpoint, err := db.GetOne[models.Endpoint](
  126. db.Equal("id", endpoint_id),
  127. db.Equal("tenant_id", tenant_id),
  128. )
  129. if err != nil {
  130. return entities.NewErrorResponse(-404, "Endpoint not found")
  131. }
  132. endpoint.Enabled = true
  133. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  134. return entities.NewErrorResponse(-500, "Failed to enable endpoint")
  135. }
  136. return entities.NewSuccessResponse("success")
  137. }
  138. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  139. endpoint, err := db.GetOne[models.Endpoint](
  140. db.Equal("id", endpoint_id),
  141. db.Equal("tenant_id", tenant_id),
  142. )
  143. if err != nil {
  144. return entities.NewErrorResponse(-404, "Endpoint not found")
  145. }
  146. endpoint.Enabled = false
  147. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  148. return entities.NewErrorResponse(-500, "Failed to disable endpoint")
  149. }
  150. return entities.NewSuccessResponse("success")
  151. }