endpoint.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync/atomic"
  10. "time"
  11. "github.com/gin-gonic/gin"
  12. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  17. "github.com/langgenius/dify-plugin-daemon/internal/db"
  18. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  19. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  21. "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  23. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  24. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  25. "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
  26. )
  27. func Endpoint(
  28. ctx *gin.Context,
  29. endpoint *models.Endpoint,
  30. pluginInstallation *models.PluginInstallation,
  31. maxExecutionTime time.Duration,
  32. path string,
  33. ) {
  34. if !endpoint.Enabled {
  35. ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
  36. return
  37. }
  38. req := ctx.Request.Clone(context.Background())
  39. // get query params
  40. queryParams := req.URL.Query()
  41. // replace path with endpoint path
  42. req.URL.Path = path
  43. // set query params
  44. req.URL.RawQuery = queryParams.Encode()
  45. // read request body until complete, max 10MB
  46. body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
  47. if err != nil {
  48. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  49. return
  50. }
  51. // replace with a new reader
  52. req.Body = io.NopCloser(bytes.NewReader(body))
  53. req.ContentLength = int64(len(body))
  54. req.TransferEncoding = nil
  55. // remove ip traces for security
  56. req.Header.Del("X-Forwarded-For")
  57. req.Header.Del("X-Real-IP")
  58. req.Header.Del("X-Forwarded")
  59. req.Header.Del("X-Original-Forwarded-For")
  60. req.Header.Del("X-Original-Url")
  61. req.Header.Del("X-Original-Host")
  62. // setup hook id to request
  63. req.Header.Set("Dify-Hook-Id", endpoint.HookID)
  64. // check if Dify-Hook-Url is set
  65. if url := req.Header.Get("Dify-Hook-Url"); url == "" {
  66. req.Header.Set(
  67. "Dify-Hook-Url",
  68. fmt.Sprintf("http://%s:%s/e/%s%s", req.Host, req.URL.Port(), endpoint.HookID, path),
  69. )
  70. }
  71. var buffer bytes.Buffer
  72. err = req.Write(&buffer)
  73. if err != nil {
  74. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  75. return
  76. }
  77. identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
  78. if err != nil {
  79. ctx.JSON(400, exception.UniqueIdentifierError(err).ToResponse())
  80. return
  81. }
  82. // fetch plugin
  83. manager := plugin_manager.Manager()
  84. runtime, err := manager.Get(identifier)
  85. if err != nil {
  86. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  87. return
  88. }
  89. // fetch endpoint declaration
  90. endpointDeclaration := runtime.Configuration().Endpoint
  91. if endpointDeclaration == nil {
  92. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  93. return
  94. }
  95. // decrypt settings
  96. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  97. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  98. TenantId: endpoint.TenantID,
  99. UserId: "",
  100. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  101. },
  102. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  103. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  104. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  105. Identity: endpoint.ID,
  106. Data: endpoint.Settings,
  107. Config: endpointDeclaration.Settings,
  108. },
  109. })
  110. if err != nil {
  111. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  112. return
  113. }
  114. session := session_manager.NewSession(
  115. session_manager.NewSessionPayload{
  116. TenantID: endpoint.TenantID,
  117. UserID: "",
  118. PluginUniqueIdentifier: identifier,
  119. ClusterID: ctx.GetString("cluster_id"),
  120. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  121. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  122. Declaration: runtime.Configuration(),
  123. BackwardsInvocation: manager.BackwardsInvocation(),
  124. IgnoreCache: false,
  125. EndpointID: &endpoint.ID,
  126. },
  127. )
  128. defer session.Close(session_manager.CloseSessionPayload{
  129. IgnoreCache: false,
  130. })
  131. session.BindRuntime(runtime)
  132. statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
  133. session, &requests.RequestInvokeEndpoint{
  134. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  135. Settings: settings,
  136. },
  137. )
  138. if err != nil {
  139. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  140. return
  141. }
  142. defer response.Close()
  143. done := make(chan bool)
  144. closed := new(int32)
  145. ctx.Status(statusCode)
  146. for k, v := range *headers {
  147. if len(v) > 0 {
  148. ctx.Writer.Header().Set(k, v[0])
  149. }
  150. }
  151. close := func() {
  152. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  153. close(done)
  154. }
  155. }
  156. defer close()
  157. routine.Submit(map[string]string{
  158. "module": "service",
  159. "function": "Endpoint",
  160. }, func() {
  161. defer close()
  162. for response.Next() {
  163. chunk, err := response.Read()
  164. if err != nil {
  165. ctx.Writer.Write([]byte(err.Error()))
  166. ctx.Writer.Flush()
  167. return
  168. }
  169. ctx.Writer.Write(chunk)
  170. ctx.Writer.Flush()
  171. }
  172. })
  173. select {
  174. case <-ctx.Writer.CloseNotify():
  175. case <-done:
  176. case <-time.After(maxExecutionTime):
  177. ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
  178. }
  179. }
  180. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  181. if err := install_service.EnabledEndpoint(endpoint_id, tenant_id); err != nil {
  182. return exception.InternalServerError(errors.New("failed to enable endpoint")).ToResponse()
  183. }
  184. return entities.NewSuccessResponse(true)
  185. }
  186. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  187. if err := install_service.DisabledEndpoint(endpoint_id, tenant_id); err != nil {
  188. return exception.InternalServerError(errors.New("failed to disable endpoint")).ToResponse()
  189. }
  190. return entities.NewSuccessResponse(true)
  191. }
  192. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  193. endpoints, err := db.GetAll[models.Endpoint](
  194. db.Equal("tenant_id", tenant_id),
  195. db.OrderBy("created_at", true),
  196. db.Page(page, page_size),
  197. )
  198. if err != nil {
  199. return exception.InternalServerError(fmt.Errorf("failed to list endpoints: %v", err)).ToResponse()
  200. }
  201. manager := plugin_manager.Manager()
  202. if manager == nil {
  203. return exception.InternalServerError(errors.New("failed to get plugin manager")).ToResponse()
  204. }
  205. // decrypt settings
  206. for i, endpoint := range endpoints {
  207. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  208. db.Equal("plugin_id", endpoint.PluginID),
  209. db.Equal("tenant_id", tenant_id),
  210. )
  211. if err != nil {
  212. // use empty settings and declaration for uninstalled plugins
  213. endpoint.Settings = map[string]any{}
  214. endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
  215. Settings: []plugin_entities.ProviderConfig{},
  216. Endpoints: []plugin_entities.EndpointDeclaration{},
  217. EndpointFiles: []string{},
  218. }
  219. endpoints[i] = endpoint
  220. continue
  221. }
  222. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  223. pluginInstallation.PluginUniqueIdentifier,
  224. )
  225. if err != nil {
  226. return exception.UniqueIdentifierError(
  227. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  228. ).ToResponse()
  229. }
  230. pluginDeclaration, err := manager.GetDeclaration(
  231. pluginUniqueIdentifier,
  232. tenant_id,
  233. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  234. )
  235. if err != nil {
  236. return exception.InternalServerError(
  237. fmt.Errorf("failed to get plugin declaration: %v", err),
  238. ).ToResponse()
  239. }
  240. if pluginDeclaration.Endpoint == nil {
  241. return exception.NotFoundError(errors.New("plugin does not have an endpoint")).ToResponse()
  242. }
  243. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  244. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  245. TenantId: tenant_id,
  246. UserId: "",
  247. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  248. },
  249. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  250. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  251. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  252. Identity: endpoint.ID,
  253. Data: endpoint.Settings,
  254. Config: pluginDeclaration.Endpoint.Settings,
  255. },
  256. })
  257. if err != nil {
  258. return exception.InternalServerError(
  259. fmt.Errorf("failed to decrypt settings: %v", err),
  260. ).ToResponse()
  261. }
  262. // mask settings
  263. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  264. endpoint.Settings = decryptedSettings
  265. endpoint.Declaration = pluginDeclaration.Endpoint
  266. endpoints[i] = endpoint
  267. }
  268. return entities.NewSuccessResponse(endpoints)
  269. }
  270. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  271. endpoints, err := db.GetAll[models.Endpoint](
  272. db.Equal("plugin_id", plugin_id),
  273. db.Equal("tenant_id", tenant_id),
  274. db.OrderBy("created_at", true),
  275. db.Page(page, page_size),
  276. )
  277. if err != nil {
  278. return exception.InternalServerError(
  279. fmt.Errorf("failed to list endpoints: %v", err),
  280. ).ToResponse()
  281. }
  282. manager := plugin_manager.Manager()
  283. if manager == nil {
  284. return exception.InternalServerError(
  285. errors.New("failed to get plugin manager"),
  286. ).ToResponse()
  287. }
  288. // decrypt settings
  289. for i, endpoint := range endpoints {
  290. // get installation
  291. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  292. db.Equal("plugin_id", plugin_id),
  293. db.Equal("tenant_id", tenant_id),
  294. )
  295. if err != nil {
  296. return exception.NotFoundError(
  297. fmt.Errorf("failed to find plugin installation: %v", err),
  298. ).ToResponse()
  299. }
  300. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  301. pluginInstallation.PluginUniqueIdentifier,
  302. )
  303. if err != nil {
  304. return exception.UniqueIdentifierError(
  305. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  306. ).ToResponse()
  307. }
  308. pluginDeclaration, err := manager.GetDeclaration(
  309. pluginUniqueIdentifier,
  310. tenant_id,
  311. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  312. )
  313. if err != nil {
  314. return exception.InternalServerError(
  315. fmt.Errorf("failed to get plugin declaration: %v", err),
  316. ).ToResponse()
  317. }
  318. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  319. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  320. TenantId: tenant_id,
  321. UserId: "",
  322. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  323. },
  324. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  325. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  326. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  327. Identity: endpoint.ID,
  328. Data: endpoint.Settings,
  329. Config: pluginDeclaration.Endpoint.Settings,
  330. },
  331. })
  332. if err != nil {
  333. return exception.InternalServerError(
  334. fmt.Errorf("failed to decrypt settings: %v", err),
  335. ).ToResponse()
  336. }
  337. // mask settings
  338. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  339. endpoint.Settings = decryptedSettings
  340. endpoint.Declaration = pluginDeclaration.Endpoint
  341. endpoints[i] = endpoint
  342. }
  343. return entities.NewSuccessResponse(endpoints)
  344. }