endpoint.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync/atomic"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  17. "github.com/langgenius/dify-plugin-daemon/internal/db"
  18. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  21. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  22. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  23. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  24. "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
  25. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  26. )
  27. func Endpoint(
  28. ctx *gin.Context,
  29. endpoint *models.Endpoint,
  30. pluginInstallation *models.PluginInstallation,
  31. path string,
  32. ) {
  33. if !endpoint.Enabled {
  34. ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
  35. return
  36. }
  37. req := ctx.Request.Clone(context.Background())
  38. req.URL.Path = path
  39. // read request body until complete, max 10MB
  40. body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
  41. if err != nil {
  42. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  43. return
  44. }
  45. // replace with a new reader
  46. req.Body = io.NopCloser(bytes.NewReader(body))
  47. req.ContentLength = int64(len(body))
  48. req.TransferEncoding = nil
  49. // remove ip traces for security
  50. req.Header.Del("X-Forwarded-For")
  51. req.Header.Del("X-Real-IP")
  52. req.Header.Del("X-Forwarded")
  53. req.Header.Del("X-Original-Forwarded-For")
  54. req.Header.Del("X-Original-Url")
  55. req.Header.Del("X-Original-Host")
  56. // setup hook id to request
  57. req.Header.Set("Dify-Hook-Id", endpoint.HookID)
  58. var buffer bytes.Buffer
  59. err = req.Write(&buffer)
  60. if err != nil {
  61. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  62. return
  63. }
  64. identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
  65. if err != nil {
  66. ctx.JSON(400, exception.UniqueIdentifierError(err).ToResponse())
  67. return
  68. }
  69. // fetch plugin
  70. manager := plugin_manager.Manager()
  71. runtime, err := manager.Get(identifier)
  72. if err != nil {
  73. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  74. return
  75. }
  76. // fetch endpoint declaration
  77. endpointDeclaration := runtime.Configuration().Endpoint
  78. if endpointDeclaration == nil {
  79. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  80. return
  81. }
  82. // decrypt settings
  83. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  84. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  85. TenantId: endpoint.TenantID,
  86. UserId: "",
  87. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  88. },
  89. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  90. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  91. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  92. Identity: endpoint.ID,
  93. Data: endpoint.Settings,
  94. Config: endpointDeclaration.Settings,
  95. },
  96. })
  97. if err != nil {
  98. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  99. return
  100. }
  101. session := session_manager.NewSession(
  102. session_manager.NewSessionPayload{
  103. TenantID: endpoint.TenantID,
  104. UserID: "",
  105. PluginUniqueIdentifier: identifier,
  106. ClusterID: ctx.GetString("cluster_id"),
  107. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  108. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  109. Declaration: runtime.Configuration(),
  110. BackwardsInvocation: manager.BackwardsInvocation(),
  111. IgnoreCache: false,
  112. EndpointID: &endpoint.ID,
  113. },
  114. )
  115. defer session.Close(session_manager.CloseSessionPayload{
  116. IgnoreCache: false,
  117. })
  118. session.BindRuntime(runtime)
  119. statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
  120. session, &requests.RequestInvokeEndpoint{
  121. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  122. Settings: settings,
  123. },
  124. )
  125. if err != nil {
  126. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  127. return
  128. }
  129. defer response.Close()
  130. done := make(chan bool)
  131. closed := new(int32)
  132. ctx.Status(statusCode)
  133. for k, v := range *headers {
  134. if len(v) > 0 {
  135. ctx.Writer.Header().Set(k, v[0])
  136. }
  137. }
  138. close := func() {
  139. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  140. close(done)
  141. }
  142. }
  143. defer close()
  144. routine.Submit(map[string]string{
  145. "module": "service",
  146. "function": "Endpoint",
  147. }, func() {
  148. defer close()
  149. for response.Next() {
  150. chunk, err := response.Read()
  151. if err != nil {
  152. ctx.Writer.Write([]byte(err.Error()))
  153. ctx.Writer.Flush()
  154. return
  155. }
  156. ctx.Writer.Write(chunk)
  157. ctx.Writer.Flush()
  158. }
  159. })
  160. select {
  161. case <-ctx.Writer.CloseNotify():
  162. case <-done:
  163. case <-time.After(240 * time.Second):
  164. ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
  165. }
  166. }
  167. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  168. endpoint, err := db.GetOne[models.Endpoint](
  169. db.Equal("id", endpoint_id),
  170. db.Equal("tenant_id", tenant_id),
  171. )
  172. if err != nil {
  173. return exception.NotFoundError(errors.New("endpoint not found")).ToResponse()
  174. }
  175. endpoint.Enabled = true
  176. if err := install_service.EnabledEndpoint(&endpoint); err != nil {
  177. return exception.InternalServerError(errors.New("failed to enable endpoint")).ToResponse()
  178. }
  179. return entities.NewSuccessResponse(true)
  180. }
  181. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  182. endpoint, err := db.GetOne[models.Endpoint](
  183. db.Equal("id", endpoint_id),
  184. db.Equal("tenant_id", tenant_id),
  185. )
  186. if err != nil {
  187. return exception.NotFoundError(errors.New("Endpoint not found")).ToResponse()
  188. }
  189. endpoint.Enabled = false
  190. if err := install_service.DisabledEndpoint(&endpoint); err != nil {
  191. return exception.InternalServerError(errors.New("failed to disable endpoint")).ToResponse()
  192. }
  193. return entities.NewSuccessResponse(true)
  194. }
  195. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  196. endpoints, err := db.GetAll[models.Endpoint](
  197. db.Equal("tenant_id", tenant_id),
  198. db.OrderBy("created_at", true),
  199. db.Page(page, page_size),
  200. )
  201. if err != nil {
  202. return exception.InternalServerError(fmt.Errorf("failed to list endpoints: %v", err)).ToResponse()
  203. }
  204. manager := plugin_manager.Manager()
  205. if manager == nil {
  206. return exception.InternalServerError(errors.New("failed to get plugin manager")).ToResponse()
  207. }
  208. // decrypt settings
  209. for i, endpoint := range endpoints {
  210. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  211. db.Equal("plugin_id", endpoint.PluginID),
  212. db.Equal("tenant_id", tenant_id),
  213. )
  214. if err != nil {
  215. // use empty settings and declaration for uninstalled plugins
  216. endpoint.Settings = map[string]any{}
  217. endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
  218. Settings: []plugin_entities.ProviderConfig{},
  219. Endpoints: []plugin_entities.EndpointDeclaration{},
  220. EndpointFiles: []string{},
  221. }
  222. endpoints[i] = endpoint
  223. continue
  224. }
  225. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  226. pluginInstallation.PluginUniqueIdentifier,
  227. )
  228. if err != nil {
  229. return exception.UniqueIdentifierError(
  230. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  231. ).ToResponse()
  232. }
  233. pluginDeclaration, err := manager.GetDeclaration(
  234. pluginUniqueIdentifier,
  235. tenant_id,
  236. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  237. )
  238. if err != nil {
  239. return exception.InternalServerError(
  240. fmt.Errorf("failed to get plugin declaration: %v", err),
  241. ).ToResponse()
  242. }
  243. if pluginDeclaration.Endpoint == nil {
  244. return exception.NotFoundError(errors.New("plugin does not have an endpoint")).ToResponse()
  245. }
  246. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  247. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  248. TenantId: tenant_id,
  249. UserId: "",
  250. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  251. },
  252. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  253. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  254. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  255. Identity: endpoint.ID,
  256. Data: endpoint.Settings,
  257. Config: pluginDeclaration.Endpoint.Settings,
  258. },
  259. })
  260. if err != nil {
  261. return exception.InternalServerError(
  262. fmt.Errorf("failed to decrypt settings: %v", err),
  263. ).ToResponse()
  264. }
  265. // mask settings
  266. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  267. endpoint.Settings = decryptedSettings
  268. endpoint.Declaration = pluginDeclaration.Endpoint
  269. endpoints[i] = endpoint
  270. }
  271. return entities.NewSuccessResponse(endpoints)
  272. }
  273. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  274. endpoints, err := db.GetAll[models.Endpoint](
  275. db.Equal("plugin_id", plugin_id),
  276. db.Equal("tenant_id", tenant_id),
  277. db.OrderBy("created_at", true),
  278. db.Page(page, page_size),
  279. )
  280. if err != nil {
  281. return exception.InternalServerError(
  282. fmt.Errorf("failed to list endpoints: %v", err),
  283. ).ToResponse()
  284. }
  285. manager := plugin_manager.Manager()
  286. if manager == nil {
  287. return exception.InternalServerError(
  288. errors.New("failed to get plugin manager"),
  289. ).ToResponse()
  290. }
  291. // decrypt settings
  292. for i, endpoint := range endpoints {
  293. // get installation
  294. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  295. db.Equal("plugin_id", plugin_id),
  296. db.Equal("tenant_id", tenant_id),
  297. )
  298. if err != nil {
  299. return exception.NotFoundError(
  300. fmt.Errorf("failed to find plugin installation: %v", err),
  301. ).ToResponse()
  302. }
  303. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  304. pluginInstallation.PluginUniqueIdentifier,
  305. )
  306. if err != nil {
  307. return exception.UniqueIdentifierError(
  308. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  309. ).ToResponse()
  310. }
  311. pluginDeclaration, err := manager.GetDeclaration(
  312. pluginUniqueIdentifier,
  313. tenant_id,
  314. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  315. )
  316. if err != nil {
  317. return exception.InternalServerError(
  318. fmt.Errorf("failed to get plugin declaration: %v", err),
  319. ).ToResponse()
  320. }
  321. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  322. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  323. TenantId: tenant_id,
  324. UserId: "",
  325. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  326. },
  327. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  328. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  329. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  330. Identity: endpoint.ID,
  331. Data: endpoint.Settings,
  332. Config: pluginDeclaration.Endpoint.Settings,
  333. },
  334. })
  335. if err != nil {
  336. return exception.InternalServerError(
  337. fmt.Errorf("failed to decrypt settings: %v", err),
  338. ).ToResponse()
  339. }
  340. // mask settings
  341. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  342. endpoint.Settings = decryptedSettings
  343. endpoint.Declaration = pluginDeclaration.Endpoint
  344. endpoints[i] = endpoint
  345. }
  346. return entities.NewSuccessResponse(endpoints)
  347. }