endpoint.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "sync/atomic"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  15. "github.com/langgenius/dify-plugin-daemon/internal/db"
  16. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  23. )
  24. func Endpoint(
  25. ctx *gin.Context,
  26. endpoint *models.Endpoint,
  27. plugin_installation *models.PluginInstallation,
  28. path string,
  29. ) {
  30. if !endpoint.Enabled {
  31. ctx.JSON(404, gin.H{"error": "plugin not found"})
  32. return
  33. }
  34. req := ctx.Request.Clone(context.Background())
  35. req.URL.Path = path
  36. var buffer bytes.Buffer
  37. err := req.Write(&buffer)
  38. if err != nil {
  39. ctx.JSON(500, gin.H{"error": err.Error()})
  40. }
  41. identifier, err := plugin_entities.NewPluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier)
  42. if err != nil {
  43. ctx.JSON(400, gin.H{"error": "Invalid plugin identifier, " + err.Error()})
  44. return
  45. }
  46. // fetch plugin
  47. manager := plugin_manager.Manager()
  48. runtime := manager.Get(identifier)
  49. if runtime == nil {
  50. ctx.JSON(404, gin.H{"error": "plugin not found"})
  51. return
  52. }
  53. // fetch endpoint declaration
  54. endpoint_declaration := runtime.Configuration().Endpoint
  55. if endpoint_declaration == nil {
  56. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  57. return
  58. }
  59. // decrypt settings
  60. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  61. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  62. TenantId: endpoint.TenantID,
  63. UserId: "",
  64. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  65. },
  66. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  67. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  68. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  69. Identity: endpoint.ID,
  70. Data: endpoint.Settings,
  71. Config: endpoint_declaration.Settings,
  72. },
  73. })
  74. if err != nil {
  75. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  76. return
  77. }
  78. session := session_manager.NewSession(
  79. session_manager.NewSessionPayload{
  80. TenantID: endpoint.TenantID,
  81. UserID: "",
  82. PluginUniqueIdentifier: identifier,
  83. ClusterID: ctx.GetString("cluster_id"),
  84. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  85. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  86. Declaration: runtime.Configuration(),
  87. BackwardsInvocation: manager.BackwardsInvocation(),
  88. IgnoreCache: false,
  89. },
  90. )
  91. defer session.Close(session_manager.CloseSessionPayload{
  92. IgnoreCache: false,
  93. })
  94. session.BindRuntime(runtime)
  95. status_code, headers, response, err := plugin_daemon.InvokeEndpoint(
  96. session, &requests.RequestInvokeEndpoint{
  97. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  98. Settings: settings,
  99. },
  100. )
  101. if err != nil {
  102. ctx.JSON(500, gin.H{"error": err.Error()})
  103. return
  104. }
  105. defer response.Close()
  106. done := make(chan bool)
  107. closed := new(int32)
  108. ctx.Status(status_code)
  109. for k, v := range *headers {
  110. if len(v) > 0 {
  111. ctx.Writer.Header().Set(k, v[0])
  112. }
  113. }
  114. close := func() {
  115. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  116. close(done)
  117. }
  118. }
  119. defer close()
  120. routine.Submit(func() {
  121. defer close()
  122. for response.Next() {
  123. chunk, err := response.Read()
  124. if err != nil {
  125. ctx.JSON(500, gin.H{"error": err.Error()})
  126. return
  127. }
  128. ctx.Writer.Write(chunk)
  129. ctx.Writer.Flush()
  130. }
  131. })
  132. select {
  133. case <-ctx.Writer.CloseNotify():
  134. case <-done:
  135. case <-time.After(240 * time.Second):
  136. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  137. }
  138. }
  139. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  140. endpoint, err := db.GetOne[models.Endpoint](
  141. db.Equal("id", endpoint_id),
  142. db.Equal("tenant_id", tenant_id),
  143. )
  144. if err != nil {
  145. return entities.NewErrorResponse(-404, "Endpoint not found")
  146. }
  147. endpoint.Enabled = true
  148. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  149. return entities.NewErrorResponse(-500, "Failed to enable endpoint")
  150. }
  151. return entities.NewSuccessResponse(true)
  152. }
  153. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  154. endpoint, err := db.GetOne[models.Endpoint](
  155. db.Equal("id", endpoint_id),
  156. db.Equal("tenant_id", tenant_id),
  157. )
  158. if err != nil {
  159. return entities.NewErrorResponse(-404, "Endpoint not found")
  160. }
  161. endpoint.Enabled = false
  162. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  163. return entities.NewErrorResponse(-500, "Failed to disable endpoint")
  164. }
  165. return entities.NewSuccessResponse(true)
  166. }
  167. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  168. endpoints, err := db.GetAll[models.Endpoint](
  169. db.Equal("tenant_id", tenant_id),
  170. db.OrderBy("created_at", true),
  171. db.Page(page, page_size),
  172. )
  173. if err != nil {
  174. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  175. }
  176. manager := plugin_manager.Manager()
  177. if manager == nil {
  178. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  179. }
  180. // decrypt settings
  181. for i, endpoint := range endpoints {
  182. plugin_installation, err := db.GetOne[models.PluginInstallation](
  183. db.Equal("plugin_id", endpoint.PluginID),
  184. db.Equal("tenant_id", tenant_id),
  185. )
  186. if err != nil {
  187. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err))
  188. }
  189. plugin_unique_identifier, err := plugin_entities.NewPluginUniqueIdentifier(
  190. plugin_installation.PluginUniqueIdentifier,
  191. )
  192. if err != nil {
  193. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  194. }
  195. plugin_declaration, err := helper.CombinedGetPluginDeclaration(plugin_unique_identifier)
  196. if err != nil {
  197. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to get plugin declaration: %v", err))
  198. }
  199. if plugin_declaration.Endpoint == nil {
  200. return entities.NewErrorResponse(-404, "plugin does not have an endpoint")
  201. }
  202. decrypted_settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  203. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  204. TenantId: tenant_id,
  205. UserId: "",
  206. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  207. },
  208. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  209. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  210. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  211. Identity: endpoint.ID,
  212. Data: endpoint.Settings,
  213. Config: plugin_declaration.Endpoint.Settings,
  214. },
  215. })
  216. if err != nil {
  217. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  218. }
  219. endpoint.Settings = decrypted_settings
  220. endpoint.Declaration = plugin_declaration.Endpoint
  221. endpoints[i] = endpoint
  222. }
  223. return entities.NewSuccessResponse(endpoints)
  224. }
  225. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  226. endpoints, err := db.GetAll[models.Endpoint](
  227. db.Equal("plugin_id", plugin_id),
  228. db.Equal("tenant_id", tenant_id),
  229. db.OrderBy("created_at", true),
  230. db.Page(page, page_size),
  231. )
  232. if err != nil {
  233. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  234. }
  235. manager := plugin_manager.Manager()
  236. if manager == nil {
  237. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  238. }
  239. // decrypt settings
  240. for i, endpoint := range endpoints {
  241. // get installation
  242. plugin_installation, err := db.GetOne[models.PluginInstallation](
  243. db.Equal("plugin_id", plugin_id),
  244. db.Equal("tenant_id", tenant_id),
  245. )
  246. if err != nil {
  247. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err))
  248. }
  249. plugin_unique_identifier, err := plugin_entities.NewPluginUniqueIdentifier(
  250. plugin_installation.PluginUniqueIdentifier,
  251. )
  252. if err != nil {
  253. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  254. }
  255. plugin_declaration, err := helper.CombinedGetPluginDeclaration(plugin_unique_identifier)
  256. if err != nil {
  257. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to get plugin declaration: %v", err))
  258. }
  259. decrypted_settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  260. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  261. TenantId: tenant_id,
  262. UserId: "",
  263. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  264. },
  265. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  266. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  267. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  268. Identity: endpoint.ID,
  269. Data: endpoint.Settings,
  270. Config: plugin_declaration.Endpoint.Settings,
  271. },
  272. })
  273. if err != nil {
  274. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  275. }
  276. endpoint.Settings = decrypted_settings
  277. endpoint.Declaration = plugin_declaration.Endpoint
  278. endpoints[i] = endpoint
  279. }
  280. return entities.NewSuccessResponse(endpoints)
  281. }