endpoint.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "sync/atomic"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  15. "github.com/langgenius/dify-plugin-daemon/internal/db"
  16. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  22. )
  23. func Endpoint(
  24. ctx *gin.Context,
  25. endpoint *models.Endpoint,
  26. plugin_installation *models.PluginInstallation,
  27. path string,
  28. ) {
  29. req := ctx.Request.Clone(context.Background())
  30. req.URL.Path = path
  31. var buffer bytes.Buffer
  32. err := req.Write(&buffer)
  33. if err != nil {
  34. ctx.JSON(500, gin.H{"error": err.Error()})
  35. }
  36. identifier, err := plugin_entities.NewPluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier)
  37. if err != nil {
  38. ctx.JSON(400, gin.H{"error": "Invalid plugin identifier, " + err.Error()})
  39. return
  40. }
  41. // fetch plugin
  42. manager := plugin_manager.Manager()
  43. runtime := manager.Get(identifier)
  44. if runtime == nil {
  45. ctx.JSON(404, gin.H{"error": "plugin not found"})
  46. return
  47. }
  48. // fetch endpoint declaration
  49. endpoint_declaration := runtime.Configuration().Endpoint
  50. if endpoint_declaration == nil {
  51. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  52. return
  53. }
  54. // decrypt settings
  55. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  56. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  57. TenantId: endpoint.TenantID,
  58. UserId: "",
  59. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  60. },
  61. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  62. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  63. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  64. Identity: endpoint.ID,
  65. Data: endpoint.GetSettings(),
  66. Config: endpoint_declaration.Settings,
  67. },
  68. })
  69. if err != nil {
  70. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  71. return
  72. }
  73. session := session_manager.NewSession(
  74. endpoint.TenantID,
  75. "",
  76. identifier,
  77. ctx.GetString("cluster_id"),
  78. access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  79. access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  80. runtime.Configuration(),
  81. manager.BackwardsInvocation(),
  82. )
  83. defer session.Close()
  84. session.BindRuntime(runtime)
  85. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(
  86. session, &requests.RequestInvokeEndpoint{
  87. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  88. Settings: settings,
  89. },
  90. )
  91. if err != nil {
  92. ctx.JSON(500, gin.H{"error": err.Error()})
  93. return
  94. }
  95. defer response.Close()
  96. done := make(chan bool)
  97. closed := new(int32)
  98. ctx.Status(status_code)
  99. for k, v := range *headers {
  100. if len(v) > 0 {
  101. ctx.Writer.Header().Set(k, v[0])
  102. }
  103. }
  104. close := func() {
  105. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  106. close(done)
  107. }
  108. }
  109. defer close()
  110. routine.Submit(func() {
  111. defer close()
  112. for response.Next() {
  113. chunk, err := response.Read()
  114. if err != nil {
  115. ctx.JSON(500, gin.H{"error": err.Error()})
  116. return
  117. }
  118. ctx.Writer.Write(chunk)
  119. ctx.Writer.Flush()
  120. }
  121. })
  122. select {
  123. case <-ctx.Writer.CloseNotify():
  124. case <-done:
  125. case <-time.After(30 * time.Second):
  126. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  127. }
  128. }
  129. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  130. endpoint, err := db.GetOne[models.Endpoint](
  131. db.Equal("id", endpoint_id),
  132. db.Equal("tenant_id", tenant_id),
  133. )
  134. if err != nil {
  135. return entities.NewErrorResponse(-404, "Endpoint not found")
  136. }
  137. endpoint.Enabled = true
  138. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  139. return entities.NewErrorResponse(-500, "Failed to enable endpoint")
  140. }
  141. return entities.NewSuccessResponse("success")
  142. }
  143. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  144. endpoint, err := db.GetOne[models.Endpoint](
  145. db.Equal("id", endpoint_id),
  146. db.Equal("tenant_id", tenant_id),
  147. )
  148. if err != nil {
  149. return entities.NewErrorResponse(-404, "Endpoint not found")
  150. }
  151. endpoint.Enabled = false
  152. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  153. return entities.NewErrorResponse(-500, "Failed to disable endpoint")
  154. }
  155. return entities.NewSuccessResponse("success")
  156. }
  157. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  158. endpoints, err := db.GetAll[models.Endpoint](
  159. db.Equal("tenant_id", tenant_id),
  160. db.OrderBy("created_at", true),
  161. db.Page(page, page_size),
  162. )
  163. if err != nil {
  164. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  165. }
  166. manager := plugin_manager.Manager()
  167. if manager == nil {
  168. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  169. }
  170. // decrypt settings
  171. for i, endpoint := range endpoints {
  172. plugin_installation, err := db.GetOne[models.PluginInstallation](
  173. db.Equal("plugin_id", endpoint.PluginID),
  174. db.Equal("tenant_id", tenant_id),
  175. )
  176. if err != nil {
  177. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err))
  178. }
  179. plugin, err := db.GetOne[models.Plugin](
  180. db.Equal("plugin_unique_identifier", plugin_installation.PluginUniqueIdentifier),
  181. )
  182. if err != nil {
  183. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin: %v", err))
  184. }
  185. plugin_declaration, err := plugin.GetDeclaration()
  186. if err != nil {
  187. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to get plugin declaration: %v", err))
  188. }
  189. if plugin_declaration.Endpoint == nil {
  190. return entities.NewErrorResponse(-404, "plugin does not have an endpoint")
  191. }
  192. decrypted_settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  193. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  194. TenantId: tenant_id,
  195. UserId: "",
  196. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  197. },
  198. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  199. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  200. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  201. Identity: endpoint.ID,
  202. Data: endpoint.GetSettings(),
  203. Config: plugin_declaration.Endpoint.Settings,
  204. },
  205. })
  206. if err != nil {
  207. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  208. }
  209. endpoint.SetSettings(decrypted_settings)
  210. endpoints[i] = endpoint
  211. }
  212. return entities.NewSuccessResponse(endpoints)
  213. }