endpoint.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "sync/atomic"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  15. "github.com/langgenius/dify-plugin-daemon/internal/db"
  16. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
  23. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  24. )
  25. func Endpoint(
  26. ctx *gin.Context,
  27. endpoint *models.Endpoint,
  28. pluginInstallation *models.PluginInstallation,
  29. path string,
  30. ) {
  31. if !endpoint.Enabled {
  32. ctx.JSON(404, gin.H{"error": "plugin not found"})
  33. return
  34. }
  35. req := ctx.Request.Clone(context.Background())
  36. req.URL.Path = path
  37. var buffer bytes.Buffer
  38. err := req.Write(&buffer)
  39. if err != nil {
  40. ctx.JSON(500, gin.H{"error": err.Error()})
  41. }
  42. identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
  43. if err != nil {
  44. ctx.JSON(400, gin.H{"error": "Invalid plugin identifier, " + err.Error()})
  45. return
  46. }
  47. // fetch plugin
  48. manager := plugin_manager.Manager()
  49. runtime, err := manager.Get(identifier)
  50. if err != nil {
  51. ctx.JSON(404, gin.H{"error": "plugin not found"})
  52. return
  53. }
  54. // fetch endpoint declaration
  55. endpointDeclaration := runtime.Configuration().Endpoint
  56. if endpointDeclaration == nil {
  57. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  58. return
  59. }
  60. // decrypt settings
  61. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  62. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  63. TenantId: endpoint.TenantID,
  64. UserId: "",
  65. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  66. },
  67. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  68. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  69. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  70. Identity: endpoint.ID,
  71. Data: endpoint.Settings,
  72. Config: endpointDeclaration.Settings,
  73. },
  74. })
  75. if err != nil {
  76. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  77. return
  78. }
  79. session := session_manager.NewSession(
  80. session_manager.NewSessionPayload{
  81. TenantID: endpoint.TenantID,
  82. UserID: "",
  83. PluginUniqueIdentifier: identifier,
  84. ClusterID: ctx.GetString("cluster_id"),
  85. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  86. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  87. Declaration: runtime.Configuration(),
  88. BackwardsInvocation: manager.BackwardsInvocation(),
  89. IgnoreCache: false,
  90. EndpointID: &endpoint.ID,
  91. },
  92. )
  93. defer session.Close(session_manager.CloseSessionPayload{
  94. IgnoreCache: false,
  95. })
  96. session.BindRuntime(runtime)
  97. statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
  98. session, &requests.RequestInvokeEndpoint{
  99. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  100. Settings: settings,
  101. },
  102. )
  103. if err != nil {
  104. ctx.JSON(500, gin.H{"error": err.Error()})
  105. return
  106. }
  107. defer response.Close()
  108. done := make(chan bool)
  109. closed := new(int32)
  110. ctx.Status(statusCode)
  111. for k, v := range *headers {
  112. if len(v) > 0 {
  113. ctx.Writer.Header().Set(k, v[0])
  114. }
  115. }
  116. close := func() {
  117. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  118. close(done)
  119. }
  120. }
  121. defer close()
  122. routine.Submit(func() {
  123. defer close()
  124. for response.Next() {
  125. chunk, err := response.Read()
  126. if err != nil {
  127. ctx.JSON(500, gin.H{"error": err.Error()})
  128. return
  129. }
  130. ctx.Writer.Write(chunk)
  131. ctx.Writer.Flush()
  132. }
  133. })
  134. select {
  135. case <-ctx.Writer.CloseNotify():
  136. case <-done:
  137. case <-time.After(240 * time.Second):
  138. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  139. }
  140. }
  141. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  142. endpoint, err := db.GetOne[models.Endpoint](
  143. db.Equal("id", endpoint_id),
  144. db.Equal("tenant_id", tenant_id),
  145. )
  146. if err != nil {
  147. return entities.NewErrorResponse(-404, "Endpoint not found")
  148. }
  149. endpoint.Enabled = true
  150. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  151. return entities.NewErrorResponse(-500, "Failed to enable endpoint")
  152. }
  153. return entities.NewSuccessResponse(true)
  154. }
  155. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  156. endpoint, err := db.GetOne[models.Endpoint](
  157. db.Equal("id", endpoint_id),
  158. db.Equal("tenant_id", tenant_id),
  159. )
  160. if err != nil {
  161. return entities.NewErrorResponse(-404, "Endpoint not found")
  162. }
  163. endpoint.Enabled = false
  164. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  165. return entities.NewErrorResponse(-500, "Failed to disable endpoint")
  166. }
  167. return entities.NewSuccessResponse(true)
  168. }
  169. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  170. endpoints, err := db.GetAll[models.Endpoint](
  171. db.Equal("tenant_id", tenant_id),
  172. db.OrderBy("created_at", true),
  173. db.Page(page, page_size),
  174. )
  175. if err != nil {
  176. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  177. }
  178. manager := plugin_manager.Manager()
  179. if manager == nil {
  180. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  181. }
  182. // decrypt settings
  183. for i, endpoint := range endpoints {
  184. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  185. db.Equal("plugin_id", endpoint.PluginID),
  186. db.Equal("tenant_id", tenant_id),
  187. )
  188. if err != nil {
  189. // use empty settings and declaration for uninstalled plugins
  190. endpoint.Settings = map[string]any{}
  191. endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
  192. Settings: []plugin_entities.ProviderConfig{},
  193. Endpoints: []plugin_entities.EndpointDeclaration{},
  194. EndpointFiles: []string{},
  195. }
  196. endpoints[i] = endpoint
  197. continue
  198. }
  199. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  200. pluginInstallation.PluginUniqueIdentifier,
  201. )
  202. if err != nil {
  203. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  204. }
  205. pluginDeclaration, err := helper.CombinedGetPluginDeclaration(pluginUniqueIdentifier)
  206. if err != nil {
  207. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to get plugin declaration: %v", err))
  208. }
  209. if pluginDeclaration.Endpoint == nil {
  210. return entities.NewErrorResponse(-404, "plugin does not have an endpoint")
  211. }
  212. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  213. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  214. TenantId: tenant_id,
  215. UserId: "",
  216. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  217. },
  218. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  219. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  220. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  221. Identity: endpoint.ID,
  222. Data: endpoint.Settings,
  223. Config: pluginDeclaration.Endpoint.Settings,
  224. },
  225. })
  226. if err != nil {
  227. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  228. }
  229. // mask settings
  230. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  231. endpoint.Settings = decryptedSettings
  232. endpoint.Declaration = pluginDeclaration.Endpoint
  233. endpoints[i] = endpoint
  234. }
  235. return entities.NewSuccessResponse(endpoints)
  236. }
  237. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  238. endpoints, err := db.GetAll[models.Endpoint](
  239. db.Equal("plugin_id", plugin_id),
  240. db.Equal("tenant_id", tenant_id),
  241. db.OrderBy("created_at", true),
  242. db.Page(page, page_size),
  243. )
  244. if err != nil {
  245. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  246. }
  247. manager := plugin_manager.Manager()
  248. if manager == nil {
  249. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  250. }
  251. // decrypt settings
  252. for i, endpoint := range endpoints {
  253. // get installation
  254. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  255. db.Equal("plugin_id", plugin_id),
  256. db.Equal("tenant_id", tenant_id),
  257. )
  258. if err != nil {
  259. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err))
  260. }
  261. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  262. pluginInstallation.PluginUniqueIdentifier,
  263. )
  264. if err != nil {
  265. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  266. }
  267. pluginDeclaration, err := helper.CombinedGetPluginDeclaration(pluginUniqueIdentifier)
  268. if err != nil {
  269. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to get plugin declaration: %v", err))
  270. }
  271. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  272. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  273. TenantId: tenant_id,
  274. UserId: "",
  275. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  276. },
  277. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  278. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  279. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  280. Identity: endpoint.ID,
  281. Data: endpoint.Settings,
  282. Config: pluginDeclaration.Endpoint.Settings,
  283. },
  284. })
  285. if err != nil {
  286. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  287. }
  288. // mask settings
  289. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  290. endpoint.Settings = decryptedSettings
  291. endpoint.Declaration = pluginDeclaration.Endpoint
  292. endpoints[i] = endpoint
  293. }
  294. return entities.NewSuccessResponse(endpoints)
  295. }