endpoint.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "sync/atomic"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  15. "github.com/langgenius/dify-plugin-daemon/internal/db"
  16. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  18. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  23. )
  24. func Endpoint(
  25. ctx *gin.Context,
  26. endpoint *models.Endpoint,
  27. pluginInstallation *models.PluginInstallation,
  28. path string,
  29. ) {
  30. if !endpoint.Enabled {
  31. ctx.JSON(404, gin.H{"error": "plugin not found"})
  32. return
  33. }
  34. req := ctx.Request.Clone(context.Background())
  35. req.URL.Path = path
  36. var buffer bytes.Buffer
  37. err := req.Write(&buffer)
  38. if err != nil {
  39. ctx.JSON(500, gin.H{"error": err.Error()})
  40. }
  41. identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
  42. if err != nil {
  43. ctx.JSON(400, gin.H{"error": "Invalid plugin identifier, " + err.Error()})
  44. return
  45. }
  46. // fetch plugin
  47. manager := plugin_manager.Manager()
  48. runtime, err := manager.Get(identifier)
  49. if err != nil {
  50. ctx.JSON(404, gin.H{"error": "plugin not found"})
  51. return
  52. }
  53. // fetch endpoint declaration
  54. endpointDeclaration := runtime.Configuration().Endpoint
  55. if endpointDeclaration == nil {
  56. ctx.JSON(404, gin.H{"error": "endpoint declaration not found"})
  57. return
  58. }
  59. // decrypt settings
  60. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  61. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  62. TenantId: endpoint.TenantID,
  63. UserId: "",
  64. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  65. },
  66. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  67. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  68. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  69. Identity: endpoint.ID,
  70. Data: endpoint.Settings,
  71. Config: endpointDeclaration.Settings,
  72. },
  73. })
  74. if err != nil {
  75. ctx.JSON(500, gin.H{"error": "failed to decrypt data"})
  76. return
  77. }
  78. session := session_manager.NewSession(
  79. session_manager.NewSessionPayload{
  80. TenantID: endpoint.TenantID,
  81. UserID: "",
  82. PluginUniqueIdentifier: identifier,
  83. ClusterID: ctx.GetString("cluster_id"),
  84. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  85. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  86. Declaration: runtime.Configuration(),
  87. BackwardsInvocation: manager.BackwardsInvocation(),
  88. IgnoreCache: false,
  89. EndpointID: &endpoint.ID,
  90. },
  91. )
  92. defer session.Close(session_manager.CloseSessionPayload{
  93. IgnoreCache: false,
  94. })
  95. session.BindRuntime(runtime)
  96. statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
  97. session, &requests.RequestInvokeEndpoint{
  98. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  99. Settings: settings,
  100. },
  101. )
  102. if err != nil {
  103. ctx.JSON(500, gin.H{"error": err.Error()})
  104. return
  105. }
  106. defer response.Close()
  107. done := make(chan bool)
  108. closed := new(int32)
  109. ctx.Status(statusCode)
  110. for k, v := range *headers {
  111. if len(v) > 0 {
  112. ctx.Writer.Header().Set(k, v[0])
  113. }
  114. }
  115. close := func() {
  116. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  117. close(done)
  118. }
  119. }
  120. defer close()
  121. routine.Submit(func() {
  122. defer close()
  123. for response.Next() {
  124. chunk, err := response.Read()
  125. if err != nil {
  126. ctx.JSON(500, gin.H{"error": err.Error()})
  127. return
  128. }
  129. ctx.Writer.Write(chunk)
  130. ctx.Writer.Flush()
  131. }
  132. })
  133. select {
  134. case <-ctx.Writer.CloseNotify():
  135. case <-done:
  136. case <-time.After(240 * time.Second):
  137. ctx.JSON(500, gin.H{"error": "killed by timeout"})
  138. }
  139. }
  140. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  141. endpoint, err := db.GetOne[models.Endpoint](
  142. db.Equal("id", endpoint_id),
  143. db.Equal("tenant_id", tenant_id),
  144. )
  145. if err != nil {
  146. return entities.NewErrorResponse(-404, "Endpoint not found")
  147. }
  148. endpoint.Enabled = true
  149. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  150. return entities.NewErrorResponse(-500, "Failed to enable endpoint")
  151. }
  152. return entities.NewSuccessResponse(true)
  153. }
  154. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  155. endpoint, err := db.GetOne[models.Endpoint](
  156. db.Equal("id", endpoint_id),
  157. db.Equal("tenant_id", tenant_id),
  158. )
  159. if err != nil {
  160. return entities.NewErrorResponse(-404, "Endpoint not found")
  161. }
  162. endpoint.Enabled = false
  163. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  164. return entities.NewErrorResponse(-500, "Failed to disable endpoint")
  165. }
  166. return entities.NewSuccessResponse(true)
  167. }
  168. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  169. endpoints, err := db.GetAll[models.Endpoint](
  170. db.Equal("tenant_id", tenant_id),
  171. db.OrderBy("created_at", true),
  172. db.Page(page, page_size),
  173. )
  174. if err != nil {
  175. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  176. }
  177. manager := plugin_manager.Manager()
  178. if manager == nil {
  179. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  180. }
  181. // decrypt settings
  182. for i, endpoint := range endpoints {
  183. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  184. db.Equal("plugin_id", endpoint.PluginID),
  185. db.Equal("tenant_id", tenant_id),
  186. )
  187. if err != nil {
  188. // use empty settings and declaration for uninstalled plugins
  189. endpoint.Settings = map[string]any{}
  190. endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
  191. Settings: []plugin_entities.ProviderConfig{},
  192. Endpoints: []plugin_entities.EndpointDeclaration{},
  193. EndpointFiles: []string{},
  194. }
  195. endpoints[i] = endpoint
  196. continue
  197. }
  198. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  199. pluginInstallation.PluginUniqueIdentifier,
  200. )
  201. if err != nil {
  202. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  203. }
  204. pluginDeclaration, err := manager.GetDeclaration(
  205. pluginUniqueIdentifier,
  206. tenant_id,
  207. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  208. )
  209. if err != nil {
  210. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to get plugin declaration: %v", err))
  211. }
  212. if pluginDeclaration.Endpoint == nil {
  213. return entities.NewErrorResponse(-404, "plugin does not have an endpoint")
  214. }
  215. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  216. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  217. TenantId: tenant_id,
  218. UserId: "",
  219. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  220. },
  221. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  222. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  223. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  224. Identity: endpoint.ID,
  225. Data: endpoint.Settings,
  226. Config: pluginDeclaration.Endpoint.Settings,
  227. },
  228. })
  229. if err != nil {
  230. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  231. }
  232. // mask settings
  233. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  234. endpoint.Settings = decryptedSettings
  235. endpoint.Declaration = pluginDeclaration.Endpoint
  236. endpoints[i] = endpoint
  237. }
  238. return entities.NewSuccessResponse(endpoints)
  239. }
  240. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  241. endpoints, err := db.GetAll[models.Endpoint](
  242. db.Equal("plugin_id", plugin_id),
  243. db.Equal("tenant_id", tenant_id),
  244. db.OrderBy("created_at", true),
  245. db.Page(page, page_size),
  246. )
  247. if err != nil {
  248. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to list endpoints: %v", err))
  249. }
  250. manager := plugin_manager.Manager()
  251. if manager == nil {
  252. return entities.NewErrorResponse(-500, "failed to get plugin manager")
  253. }
  254. // decrypt settings
  255. for i, endpoint := range endpoints {
  256. // get installation
  257. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  258. db.Equal("plugin_id", plugin_id),
  259. db.Equal("tenant_id", tenant_id),
  260. )
  261. if err != nil {
  262. return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err))
  263. }
  264. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  265. pluginInstallation.PluginUniqueIdentifier,
  266. )
  267. if err != nil {
  268. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  269. }
  270. pluginDeclaration, err := manager.GetDeclaration(
  271. pluginUniqueIdentifier,
  272. tenant_id,
  273. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  274. )
  275. if err != nil {
  276. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to get plugin declaration: %v", err))
  277. }
  278. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  279. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  280. TenantId: tenant_id,
  281. UserId: "",
  282. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  283. },
  284. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  285. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  286. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  287. Identity: endpoint.ID,
  288. Data: endpoint.Settings,
  289. Config: pluginDeclaration.Endpoint.Settings,
  290. },
  291. })
  292. if err != nil {
  293. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to decrypt settings: %v", err))
  294. }
  295. // mask settings
  296. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  297. endpoint.Settings = decryptedSettings
  298. endpoint.Declaration = pluginDeclaration.Endpoint
  299. endpoints[i] = endpoint
  300. }
  301. return entities.NewSuccessResponse(endpoints)
  302. }