endpoint.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "sync/atomic"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  15. "github.com/langgenius/dify-plugin-daemon/internal/db"
  16. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  22. )
  23. func Endpoint(
  24. ctx *gin.Context,
  25. endpoint *models.Endpoint,
  26. plugin_installation *models.PluginInstallation,
  27. path string,
  28. ) {
  29. if !endpoint.Enabled {
  30. ctx.JSON(404, gin.H{"error": "plugin not found"})
  31. return
  32. }
  33. req := ctx.Request.Clone(context.Background())
  34. req.URL.Path = path
  35. var buffer bytes.Buffer
  36. err := req.Write(&buffer)
  37. if err != nil {
  38. ctx.JSON(500, gin.H{"error": err.Error()})
  39. }
  40. identifier, err := plugin_entities.NewPluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier)
  41. if err != nil {
  42. ctx.JSON(400, gin.H{"error": "Invalid plugin identifier, " + err.Error()})
  43. return
  44. }
  45. // fetch plugin
  46. manager := plugin_manager.Manager()
  47. runtime := manager.Get(identifier)
  48. if runtime == nil {
  49. ctx.JSON(404, gin.H{"error": "plugin not found"})
  50. return
  51. }
  52. // fetch endpoint declaration
  53. endpoint_declaration := runtime.Configuration().Endpoint
  54. if endpoint_declaration == nil {
  55. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  56. return
  57. }
  58. // decrypt settings
  59. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  60. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  61. TenantId: endpoint.TenantID,
  62. UserId: "",
  63. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  64. },
  65. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  66. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  67. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  68. Identity: endpoint.ID,
  69. Data: endpoint.GetSettings(),
  70. Config: endpoint_declaration.Settings,
  71. },
  72. })
  73. if err != nil {
  74. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  75. return
  76. }
  77. session := session_manager.NewSession(
  78. session_manager.NewSessionPayload{
  79. TenantID: endpoint.TenantID,
  80. UserID: "",
  81. PluginUniqueIdentifier: identifier,
  82. ClusterID: ctx.GetString("cluster_id"),
  83. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  84. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  85. Declaration: runtime.Configuration(),
  86. BackwardsInvocation: manager.BackwardsInvocation(),
  87. IgnoreCache: false,
  88. },
  89. )
  90. defer session.Close(session_manager.CloseSessionPayload{
  91. IgnoreCache: false,
  92. })
  93. session.BindRuntime(runtime)
  94. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(
  95. session, &requests.RequestInvokeEndpoint{
  96. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  97. Settings: settings,
  98. },
  99. )
  100. if err != nil {
  101. ctx.JSON(500, gin.H{"error": err.Error()})
  102. return
  103. }
  104. defer response.Close()
  105. done := make(chan bool)
  106. closed := new(int32)
  107. ctx.Status(status_code)
  108. for k, v := range *headers {
  109. if len(v) > 0 {
  110. ctx.Writer.Header().Set(k, v[0])
  111. }
  112. }
  113. close := func() {
  114. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  115. close(done)
  116. }
  117. }
  118. defer close()
  119. routine.Submit(func() {
  120. defer close()
  121. for response.Next() {
  122. chunk, err := response.Read()
  123. if err != nil {
  124. ctx.JSON(500, gin.H{"error": err.Error()})
  125. return
  126. }
  127. ctx.Writer.Write(chunk)
  128. ctx.Writer.Flush()
  129. }
  130. })
  131. select {
  132. case <-ctx.Writer.CloseNotify():
  133. case <-done:
  134. case <-time.After(30 * time.Second):
  135. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  136. }
  137. }
  138. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  139. endpoint, err := db.GetOne[models.Endpoint](
  140. db.Equal("id", endpoint_id),
  141. db.Equal("tenant_id", tenant_id),
  142. )
  143. if err != nil {
  144. return entities.NewErrorResponse(-404, "Endpoint not found")
  145. }
  146. endpoint.Enabled = true
  147. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  148. return entities.NewErrorResponse(-500, "Failed to enable endpoint")
  149. }
  150. return entities.NewSuccessResponse("success")
  151. }
  152. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  153. endpoint, err := db.GetOne[models.Endpoint](
  154. db.Equal("id", endpoint_id),
  155. db.Equal("tenant_id", tenant_id),
  156. )
  157. if err != nil {
  158. return entities.NewErrorResponse(-404, "Endpoint not found")
  159. }
  160. endpoint.Enabled = false
  161. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  162. return entities.NewErrorResponse(-500, "Failed to disable endpoint")
  163. }
  164. return entities.NewSuccessResponse("success")
  165. }
  166. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  167. endpoints, err := db.GetAll[models.Endpoint](
  168. db.Equal("tenant_id", tenant_id),
  169. db.OrderBy("created_at", true),
  170. db.Page(page, page_size),
  171. )
  172. if err != nil {
  173. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  174. }
  175. manager := plugin_manager.Manager()
  176. if manager == nil {
  177. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  178. }
  179. // decrypt settings
  180. for i, endpoint := range endpoints {
  181. plugin_installation, err := db.GetOne[models.PluginInstallation](
  182. db.Equal("plugin_id", endpoint.PluginID),
  183. db.Equal("tenant_id", tenant_id),
  184. )
  185. if err != nil {
  186. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err))
  187. }
  188. plugin, err := db.GetOne[models.Plugin](
  189. db.Equal("plugin_unique_identifier", plugin_installation.PluginUniqueIdentifier),
  190. )
  191. if err != nil {
  192. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin: %v", err))
  193. }
  194. plugin_declaration := plugin.Declaration
  195. if plugin_declaration.Endpoint == nil {
  196. return entities.NewErrorResponse(-404, "plugin does not have an endpoint")
  197. }
  198. decrypted_settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  199. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  200. TenantId: tenant_id,
  201. UserId: "",
  202. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  203. },
  204. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  205. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  206. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  207. Identity: endpoint.ID,
  208. Data: endpoint.GetSettings(),
  209. Config: plugin_declaration.Endpoint.Settings,
  210. },
  211. })
  212. if err != nil {
  213. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  214. }
  215. endpoint.SetSettings(decrypted_settings)
  216. endpoints[i] = endpoint
  217. }
  218. return entities.NewSuccessResponse(endpoints)
  219. }
  220. func ListPluginEndpoints(tenant_id string, plugin_unique_identifier plugin_entities.PluginUniqueIdentifier, page int, page_size int) *entities.Response {
  221. // TODO:
  222. return nil
  223. }