endpoint.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "sync/atomic"
  11. "time"
  12. "github.com/gin-gonic/gin"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  17. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  18. "github.com/langgenius/dify-plugin-daemon/internal/db"
  19. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  21. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
  23. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  24. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  25. "github.com/langgenius/dify-plugin-daemon/pkg/entities/endpoint_entities"
  26. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  27. "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
  28. )
  29. func copyRequest(req *http.Request, hookId string, path string) (*bytes.Buffer, error) {
  30. newReq := req.Clone(context.Background())
  31. // get query params
  32. queryParams := req.URL.Query()
  33. // replace path with endpoint path
  34. newReq.URL.Path = path
  35. // set query params
  36. newReq.URL.RawQuery = queryParams.Encode()
  37. // read request body until complete, max 10MB
  38. body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
  39. if err != nil {
  40. return nil, err
  41. }
  42. // replace with a new reader
  43. newReq.Body = io.NopCloser(bytes.NewReader(body))
  44. newReq.ContentLength = int64(len(body))
  45. newReq.TransferEncoding = nil
  46. // remove ip traces for security
  47. newReq.Header.Del("X-Forwarded-For")
  48. newReq.Header.Del("X-Real-IP")
  49. newReq.Header.Del("X-Forwarded")
  50. newReq.Header.Del("X-Original-Forwarded-For")
  51. newReq.Header.Del("X-Original-Url")
  52. newReq.Header.Del("X-Original-Host")
  53. // check if X-Original-Host is set
  54. if originalHost := req.Header.Get(endpoint_entities.HeaderXOriginalHost); originalHost != "" {
  55. // replace host with original host
  56. newReq.Host = originalHost
  57. }
  58. // setup hook id to request
  59. newReq.Header.Set("Dify-Hook-Id", hookId)
  60. // check if Dify-Hook-Url is set
  61. if url := req.Header.Get("Dify-Hook-Url"); url == "" {
  62. newReq.Header.Set(
  63. "Dify-Hook-Url",
  64. fmt.Sprintf("http://%s/e/%s%s", req.Host, hookId, path),
  65. )
  66. }
  67. var buffer bytes.Buffer
  68. err = newReq.Write(&buffer)
  69. if err != nil {
  70. return nil, err
  71. }
  72. return &buffer, nil
  73. }
  74. func Endpoint(
  75. ctx *gin.Context,
  76. endpoint *models.Endpoint,
  77. pluginInstallation *models.PluginInstallation,
  78. maxExecutionTime time.Duration,
  79. path string,
  80. ) {
  81. if !endpoint.Enabled {
  82. ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
  83. return
  84. }
  85. buffer, err := copyRequest(ctx.Request, endpoint.HookID, path)
  86. if err != nil {
  87. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  88. return
  89. }
  90. identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
  91. if err != nil {
  92. ctx.JSON(400, exception.UniqueIdentifierError(err).ToResponse())
  93. return
  94. }
  95. // fetch plugin
  96. manager := plugin_manager.Manager()
  97. runtime, err := manager.Get(identifier)
  98. if err != nil {
  99. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  100. return
  101. }
  102. // fetch endpoint declaration
  103. endpointDeclaration := runtime.Configuration().Endpoint
  104. if endpointDeclaration == nil {
  105. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  106. return
  107. }
  108. // decrypt settings
  109. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  110. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  111. TenantId: endpoint.TenantID,
  112. UserId: "",
  113. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  114. },
  115. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  116. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  117. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  118. Identity: endpoint.ID,
  119. Data: endpoint.Settings,
  120. Config: endpointDeclaration.Settings,
  121. },
  122. })
  123. if err != nil {
  124. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  125. return
  126. }
  127. session := session_manager.NewSession(
  128. session_manager.NewSessionPayload{
  129. TenantID: endpoint.TenantID,
  130. UserID: "",
  131. PluginUniqueIdentifier: identifier,
  132. ClusterID: ctx.GetString("cluster_id"),
  133. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  134. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  135. Declaration: runtime.Configuration(),
  136. BackwardsInvocation: manager.BackwardsInvocation(),
  137. IgnoreCache: false,
  138. EndpointID: &endpoint.ID,
  139. },
  140. )
  141. defer session.Close(session_manager.CloseSessionPayload{
  142. IgnoreCache: false,
  143. })
  144. session.BindRuntime(runtime)
  145. statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
  146. session, &requests.RequestInvokeEndpoint{
  147. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  148. Settings: settings,
  149. },
  150. )
  151. if err != nil {
  152. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  153. return
  154. }
  155. defer response.Close()
  156. done := make(chan bool)
  157. closed := new(int32)
  158. ctx.Status(statusCode)
  159. for k, v := range *headers {
  160. if len(v) > 0 {
  161. ctx.Writer.Header().Set(k, v[0])
  162. }
  163. }
  164. close := func() {
  165. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  166. close(done)
  167. }
  168. }
  169. defer close()
  170. routine.Submit(map[string]string{
  171. "module": "service",
  172. "function": "Endpoint",
  173. }, func() {
  174. defer close()
  175. for response.Next() {
  176. chunk, err := response.Read()
  177. if err != nil {
  178. ctx.Writer.Write([]byte(err.Error()))
  179. ctx.Writer.Flush()
  180. return
  181. }
  182. ctx.Writer.Write(chunk)
  183. ctx.Writer.Flush()
  184. }
  185. })
  186. select {
  187. case <-ctx.Writer.CloseNotify():
  188. case <-done:
  189. case <-time.After(maxExecutionTime):
  190. ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
  191. }
  192. }
  193. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  194. if err := install_service.EnabledEndpoint(endpoint_id, tenant_id); err != nil {
  195. return exception.InternalServerError(errors.New("failed to enable endpoint")).ToResponse()
  196. }
  197. return entities.NewSuccessResponse(true)
  198. }
  199. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  200. if err := install_service.DisabledEndpoint(endpoint_id, tenant_id); err != nil {
  201. return exception.InternalServerError(errors.New("failed to disable endpoint")).ToResponse()
  202. }
  203. return entities.NewSuccessResponse(true)
  204. }
  205. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  206. endpoints, err := db.GetAll[models.Endpoint](
  207. db.Equal("tenant_id", tenant_id),
  208. db.OrderBy("created_at", true),
  209. db.Page(page, page_size),
  210. )
  211. if err != nil {
  212. return exception.InternalServerError(fmt.Errorf("failed to list endpoints: %v", err)).ToResponse()
  213. }
  214. manager := plugin_manager.Manager()
  215. if manager == nil {
  216. return exception.InternalServerError(errors.New("failed to get plugin manager")).ToResponse()
  217. }
  218. // decrypt settings
  219. for i, endpoint := range endpoints {
  220. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  221. db.Equal("plugin_id", endpoint.PluginID),
  222. db.Equal("tenant_id", tenant_id),
  223. )
  224. if err != nil {
  225. // use empty settings and declaration for uninstalled plugins
  226. endpoint.Settings = map[string]any{}
  227. endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
  228. Settings: []plugin_entities.ProviderConfig{},
  229. Endpoints: []plugin_entities.EndpointDeclaration{},
  230. EndpointFiles: []string{},
  231. }
  232. endpoints[i] = endpoint
  233. continue
  234. }
  235. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  236. pluginInstallation.PluginUniqueIdentifier,
  237. )
  238. if err != nil {
  239. return exception.UniqueIdentifierError(
  240. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  241. ).ToResponse()
  242. }
  243. pluginDeclaration, err := manager.GetDeclaration(
  244. pluginUniqueIdentifier,
  245. tenant_id,
  246. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  247. )
  248. if err != nil {
  249. return exception.InternalServerError(
  250. fmt.Errorf("failed to get plugin declaration: %v", err),
  251. ).ToResponse()
  252. }
  253. if pluginDeclaration.Endpoint == nil {
  254. return exception.NotFoundError(errors.New("plugin does not have an endpoint")).ToResponse()
  255. }
  256. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  257. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  258. TenantId: tenant_id,
  259. UserId: "",
  260. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  261. },
  262. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  263. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  264. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  265. Identity: endpoint.ID,
  266. Data: endpoint.Settings,
  267. Config: pluginDeclaration.Endpoint.Settings,
  268. },
  269. })
  270. if err != nil {
  271. return exception.InternalServerError(
  272. fmt.Errorf("failed to decrypt settings: %v", err),
  273. ).ToResponse()
  274. }
  275. // mask settings
  276. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  277. endpoint.Settings = decryptedSettings
  278. endpoint.Declaration = pluginDeclaration.Endpoint
  279. endpoints[i] = endpoint
  280. }
  281. return entities.NewSuccessResponse(endpoints)
  282. }
  283. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  284. endpoints, err := db.GetAll[models.Endpoint](
  285. db.Equal("plugin_id", plugin_id),
  286. db.Equal("tenant_id", tenant_id),
  287. db.OrderBy("created_at", true),
  288. db.Page(page, page_size),
  289. )
  290. if err != nil {
  291. return exception.InternalServerError(
  292. fmt.Errorf("failed to list endpoints: %v", err),
  293. ).ToResponse()
  294. }
  295. manager := plugin_manager.Manager()
  296. if manager == nil {
  297. return exception.InternalServerError(
  298. errors.New("failed to get plugin manager"),
  299. ).ToResponse()
  300. }
  301. // decrypt settings
  302. for i, endpoint := range endpoints {
  303. // get installation
  304. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  305. db.Equal("plugin_id", plugin_id),
  306. db.Equal("tenant_id", tenant_id),
  307. )
  308. if err != nil {
  309. return exception.NotFoundError(
  310. fmt.Errorf("failed to find plugin installation: %v", err),
  311. ).ToResponse()
  312. }
  313. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  314. pluginInstallation.PluginUniqueIdentifier,
  315. )
  316. if err != nil {
  317. return exception.UniqueIdentifierError(
  318. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  319. ).ToResponse()
  320. }
  321. pluginDeclaration, err := manager.GetDeclaration(
  322. pluginUniqueIdentifier,
  323. tenant_id,
  324. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  325. )
  326. if err != nil {
  327. return exception.InternalServerError(
  328. fmt.Errorf("failed to get plugin declaration: %v", err),
  329. ).ToResponse()
  330. }
  331. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  332. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  333. TenantId: tenant_id,
  334. UserId: "",
  335. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  336. },
  337. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  338. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  339. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  340. Identity: endpoint.ID,
  341. Data: endpoint.Settings,
  342. Config: pluginDeclaration.Endpoint.Settings,
  343. },
  344. })
  345. if err != nil {
  346. return exception.InternalServerError(
  347. fmt.Errorf("failed to decrypt settings: %v", err),
  348. ).ToResponse()
  349. }
  350. // mask settings
  351. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  352. endpoint.Settings = decryptedSettings
  353. endpoint.Declaration = pluginDeclaration.Endpoint
  354. endpoints[i] = endpoint
  355. }
  356. return entities.NewSuccessResponse(endpoints)
  357. }