endpoint.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync/atomic"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  17. "github.com/langgenius/dify-plugin-daemon/internal/db"
  18. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  23. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  24. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  25. "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
  26. )
  27. func Endpoint(
  28. ctx *gin.Context,
  29. endpoint *models.Endpoint,
  30. pluginInstallation *models.PluginInstallation,
  31. path string,
  32. ) {
  33. if !endpoint.Enabled {
  34. ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
  35. return
  36. }
  37. req := ctx.Request.Clone(context.Background())
  38. // get query params
  39. queryParams := req.URL.Query()
  40. // replace path with endpoint path
  41. req.URL.Path = path
  42. // set query params
  43. req.URL.RawQuery = queryParams.Encode()
  44. // read request body until complete, max 10MB
  45. body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
  46. if err != nil {
  47. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  48. return
  49. }
  50. // replace with a new reader
  51. req.Body = io.NopCloser(bytes.NewReader(body))
  52. req.ContentLength = int64(len(body))
  53. req.TransferEncoding = nil
  54. // remove ip traces for security
  55. req.Header.Del("X-Forwarded-For")
  56. req.Header.Del("X-Real-IP")
  57. req.Header.Del("X-Forwarded")
  58. req.Header.Del("X-Original-Forwarded-For")
  59. req.Header.Del("X-Original-Url")
  60. req.Header.Del("X-Original-Host")
  61. // setup hook id to request
  62. req.Header.Set("Dify-Hook-Id", endpoint.HookID)
  63. // check if Dify-Hook-Url is set
  64. if url := req.Header.Get("Dify-Hook-Url"); url == "" {
  65. req.Header.Set(
  66. "Dify-Hook-Url",
  67. fmt.Sprintf("http://%s:%s/e/%s%s", req.Host, req.URL.Port(), endpoint.HookID, path),
  68. )
  69. }
  70. var buffer bytes.Buffer
  71. err = req.Write(&buffer)
  72. if err != nil {
  73. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  74. return
  75. }
  76. identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
  77. if err != nil {
  78. ctx.JSON(400, exception.UniqueIdentifierError(err).ToResponse())
  79. return
  80. }
  81. // fetch plugin
  82. manager := plugin_manager.Manager()
  83. runtime, err := manager.Get(identifier)
  84. if err != nil {
  85. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  86. return
  87. }
  88. // fetch endpoint declaration
  89. endpointDeclaration := runtime.Configuration().Endpoint
  90. if endpointDeclaration == nil {
  91. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  92. return
  93. }
  94. // decrypt settings
  95. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  96. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  97. TenantId: endpoint.TenantID,
  98. UserId: "",
  99. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  100. },
  101. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  102. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  103. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  104. Identity: endpoint.ID,
  105. Data: endpoint.Settings,
  106. Config: endpointDeclaration.Settings,
  107. },
  108. })
  109. if err != nil {
  110. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  111. return
  112. }
  113. session := session_manager.NewSession(
  114. session_manager.NewSessionPayload{
  115. TenantID: endpoint.TenantID,
  116. UserID: "",
  117. PluginUniqueIdentifier: identifier,
  118. ClusterID: ctx.GetString("cluster_id"),
  119. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  120. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  121. Declaration: runtime.Configuration(),
  122. BackwardsInvocation: manager.BackwardsInvocation(),
  123. IgnoreCache: false,
  124. EndpointID: &endpoint.ID,
  125. },
  126. )
  127. defer session.Close(session_manager.CloseSessionPayload{
  128. IgnoreCache: false,
  129. })
  130. session.BindRuntime(runtime)
  131. statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
  132. session, &requests.RequestInvokeEndpoint{
  133. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  134. Settings: settings,
  135. },
  136. )
  137. if err != nil {
  138. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  139. return
  140. }
  141. defer response.Close()
  142. done := make(chan bool)
  143. closed := new(int32)
  144. ctx.Status(statusCode)
  145. for k, v := range *headers {
  146. if len(v) > 0 {
  147. ctx.Writer.Header().Set(k, v[0])
  148. }
  149. }
  150. close := func() {
  151. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  152. close(done)
  153. }
  154. }
  155. defer close()
  156. routine.Submit(map[string]string{
  157. "module": "service",
  158. "function": "Endpoint",
  159. }, func() {
  160. defer close()
  161. for response.Next() {
  162. chunk, err := response.Read()
  163. if err != nil {
  164. ctx.Writer.Write([]byte(err.Error()))
  165. ctx.Writer.Flush()
  166. return
  167. }
  168. ctx.Writer.Write(chunk)
  169. ctx.Writer.Flush()
  170. }
  171. })
  172. select {
  173. case <-ctx.Writer.CloseNotify():
  174. case <-done:
  175. case <-time.After(240 * time.Second):
  176. ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
  177. }
  178. }
  179. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  180. if err := install_service.EnabledEndpoint(endpoint_id, tenant_id); err != nil {
  181. return exception.InternalServerError(errors.New("failed to enable endpoint")).ToResponse()
  182. }
  183. return entities.NewSuccessResponse(true)
  184. }
  185. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  186. if err := install_service.DisabledEndpoint(endpoint_id, tenant_id); err != nil {
  187. return exception.InternalServerError(errors.New("failed to disable endpoint")).ToResponse()
  188. }
  189. return entities.NewSuccessResponse(true)
  190. }
  191. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  192. endpoints, err := db.GetAll[models.Endpoint](
  193. db.Equal("tenant_id", tenant_id),
  194. db.OrderBy("created_at", true),
  195. db.Page(page, page_size),
  196. )
  197. if err != nil {
  198. return exception.InternalServerError(fmt.Errorf("failed to list endpoints: %v", err)).ToResponse()
  199. }
  200. manager := plugin_manager.Manager()
  201. if manager == nil {
  202. return exception.InternalServerError(errors.New("failed to get plugin manager")).ToResponse()
  203. }
  204. // decrypt settings
  205. for i, endpoint := range endpoints {
  206. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  207. db.Equal("plugin_id", endpoint.PluginID),
  208. db.Equal("tenant_id", tenant_id),
  209. )
  210. if err != nil {
  211. // use empty settings and declaration for uninstalled plugins
  212. endpoint.Settings = map[string]any{}
  213. endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
  214. Settings: []plugin_entities.ProviderConfig{},
  215. Endpoints: []plugin_entities.EndpointDeclaration{},
  216. EndpointFiles: []string{},
  217. }
  218. endpoints[i] = endpoint
  219. continue
  220. }
  221. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  222. pluginInstallation.PluginUniqueIdentifier,
  223. )
  224. if err != nil {
  225. return exception.UniqueIdentifierError(
  226. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  227. ).ToResponse()
  228. }
  229. pluginDeclaration, err := manager.GetDeclaration(
  230. pluginUniqueIdentifier,
  231. tenant_id,
  232. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  233. )
  234. if err != nil {
  235. return exception.InternalServerError(
  236. fmt.Errorf("failed to get plugin declaration: %v", err),
  237. ).ToResponse()
  238. }
  239. if pluginDeclaration.Endpoint == nil {
  240. return exception.NotFoundError(errors.New("plugin does not have an endpoint")).ToResponse()
  241. }
  242. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  243. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  244. TenantId: tenant_id,
  245. UserId: "",
  246. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  247. },
  248. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  249. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  250. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  251. Identity: endpoint.ID,
  252. Data: endpoint.Settings,
  253. Config: pluginDeclaration.Endpoint.Settings,
  254. },
  255. })
  256. if err != nil {
  257. return exception.InternalServerError(
  258. fmt.Errorf("failed to decrypt settings: %v", err),
  259. ).ToResponse()
  260. }
  261. // mask settings
  262. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  263. endpoint.Settings = decryptedSettings
  264. endpoint.Declaration = pluginDeclaration.Endpoint
  265. endpoints[i] = endpoint
  266. }
  267. return entities.NewSuccessResponse(endpoints)
  268. }
  269. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  270. endpoints, err := db.GetAll[models.Endpoint](
  271. db.Equal("plugin_id", plugin_id),
  272. db.Equal("tenant_id", tenant_id),
  273. db.OrderBy("created_at", true),
  274. db.Page(page, page_size),
  275. )
  276. if err != nil {
  277. return exception.InternalServerError(
  278. fmt.Errorf("failed to list endpoints: %v", err),
  279. ).ToResponse()
  280. }
  281. manager := plugin_manager.Manager()
  282. if manager == nil {
  283. return exception.InternalServerError(
  284. errors.New("failed to get plugin manager"),
  285. ).ToResponse()
  286. }
  287. // decrypt settings
  288. for i, endpoint := range endpoints {
  289. // get installation
  290. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  291. db.Equal("plugin_id", plugin_id),
  292. db.Equal("tenant_id", tenant_id),
  293. )
  294. if err != nil {
  295. return exception.NotFoundError(
  296. fmt.Errorf("failed to find plugin installation: %v", err),
  297. ).ToResponse()
  298. }
  299. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  300. pluginInstallation.PluginUniqueIdentifier,
  301. )
  302. if err != nil {
  303. return exception.UniqueIdentifierError(
  304. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  305. ).ToResponse()
  306. }
  307. pluginDeclaration, err := manager.GetDeclaration(
  308. pluginUniqueIdentifier,
  309. tenant_id,
  310. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  311. )
  312. if err != nil {
  313. return exception.InternalServerError(
  314. fmt.Errorf("failed to get plugin declaration: %v", err),
  315. ).ToResponse()
  316. }
  317. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  318. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  319. TenantId: tenant_id,
  320. UserId: "",
  321. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  322. },
  323. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  324. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  325. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  326. Identity: endpoint.ID,
  327. Data: endpoint.Settings,
  328. Config: pluginDeclaration.Endpoint.Settings,
  329. },
  330. })
  331. if err != nil {
  332. return exception.InternalServerError(
  333. fmt.Errorf("failed to decrypt settings: %v", err),
  334. ).ToResponse()
  335. }
  336. // mask settings
  337. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  338. endpoint.Settings = decryptedSettings
  339. endpoint.Declaration = pluginDeclaration.Endpoint
  340. endpoints[i] = endpoint
  341. }
  342. return entities.NewSuccessResponse(endpoints)
  343. }