endpoint.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "sync/atomic"
  11. "time"
  12. "github.com/gin-gonic/gin"
  13. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  14. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  15. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  16. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  17. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  18. "github.com/langgenius/dify-plugin-daemon/internal/db"
  19. "github.com/langgenius/dify-plugin-daemon/internal/service/install_service"
  20. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  21. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  22. "github.com/langgenius/dify-plugin-daemon/internal/utils/encryption"
  23. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  24. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  25. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  26. "github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
  27. )
  28. func copyRequest(req *http.Request, hookId string, path string) (*bytes.Buffer, error) {
  29. newReq := req.Clone(context.Background())
  30. // get query params
  31. queryParams := req.URL.Query()
  32. // replace path with endpoint path
  33. newReq.URL.Path = path
  34. // set query params
  35. newReq.URL.RawQuery = queryParams.Encode()
  36. // read request body until complete, max 10MB
  37. body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
  38. if err != nil {
  39. return nil, err
  40. }
  41. // replace with a new reader
  42. newReq.Body = io.NopCloser(bytes.NewReader(body))
  43. newReq.ContentLength = int64(len(body))
  44. newReq.TransferEncoding = nil
  45. // remove ip traces for security
  46. newReq.Header.Del("X-Forwarded-For")
  47. newReq.Header.Del("X-Real-IP")
  48. newReq.Header.Del("X-Forwarded")
  49. newReq.Header.Del("X-Original-Forwarded-For")
  50. newReq.Header.Del("X-Original-Url")
  51. newReq.Header.Del("X-Original-Host")
  52. // setup hook id to request
  53. newReq.Header.Set("Dify-Hook-Id", hookId)
  54. // check if Dify-Hook-Url is set
  55. if url := req.Header.Get("Dify-Hook-Url"); url == "" {
  56. newReq.Header.Set(
  57. "Dify-Hook-Url",
  58. fmt.Sprintf("http://%s/e/%s%s", req.Host, hookId, path),
  59. )
  60. }
  61. var buffer bytes.Buffer
  62. err = newReq.Write(&buffer)
  63. if err != nil {
  64. return nil, err
  65. }
  66. return &buffer, nil
  67. }
  68. func Endpoint(
  69. ctx *gin.Context,
  70. endpoint *models.Endpoint,
  71. pluginInstallation *models.PluginInstallation,
  72. maxExecutionTime time.Duration,
  73. path string,
  74. ) {
  75. if !endpoint.Enabled {
  76. ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
  77. return
  78. }
  79. buffer, err := copyRequest(ctx.Request, endpoint.HookID, path)
  80. if err != nil {
  81. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  82. return
  83. }
  84. identifier, err := plugin_entities.NewPluginUniqueIdentifier(pluginInstallation.PluginUniqueIdentifier)
  85. if err != nil {
  86. ctx.JSON(400, exception.UniqueIdentifierError(err).ToResponse())
  87. return
  88. }
  89. // fetch plugin
  90. manager := plugin_manager.Manager()
  91. runtime, err := manager.Get(identifier)
  92. if err != nil {
  93. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  94. return
  95. }
  96. // fetch endpoint declaration
  97. endpointDeclaration := runtime.Configuration().Endpoint
  98. if endpointDeclaration == nil {
  99. ctx.JSON(404, exception.ErrPluginNotFound().ToResponse())
  100. return
  101. }
  102. // decrypt settings
  103. settings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  104. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  105. TenantId: endpoint.TenantID,
  106. UserId: "",
  107. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  108. },
  109. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  110. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  111. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  112. Identity: endpoint.ID,
  113. Data: endpoint.Settings,
  114. Config: endpointDeclaration.Settings,
  115. },
  116. })
  117. if err != nil {
  118. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  119. return
  120. }
  121. session := session_manager.NewSession(
  122. session_manager.NewSessionPayload{
  123. TenantID: endpoint.TenantID,
  124. UserID: "",
  125. PluginUniqueIdentifier: identifier,
  126. ClusterID: ctx.GetString("cluster_id"),
  127. InvokeFrom: access_types.PLUGIN_ACCESS_TYPE_ENDPOINT,
  128. Action: access_types.PLUGIN_ACCESS_ACTION_INVOKE_ENDPOINT,
  129. Declaration: runtime.Configuration(),
  130. BackwardsInvocation: manager.BackwardsInvocation(),
  131. IgnoreCache: false,
  132. EndpointID: &endpoint.ID,
  133. },
  134. )
  135. defer session.Close(session_manager.CloseSessionPayload{
  136. IgnoreCache: false,
  137. })
  138. session.BindRuntime(runtime)
  139. statusCode, headers, response, err := plugin_daemon.InvokeEndpoint(
  140. session, &requests.RequestInvokeEndpoint{
  141. RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
  142. Settings: settings,
  143. },
  144. )
  145. if err != nil {
  146. ctx.JSON(500, exception.InternalServerError(err).ToResponse())
  147. return
  148. }
  149. defer response.Close()
  150. done := make(chan bool)
  151. closed := new(int32)
  152. ctx.Status(statusCode)
  153. for k, v := range *headers {
  154. if len(v) > 0 {
  155. ctx.Writer.Header().Set(k, v[0])
  156. }
  157. }
  158. close := func() {
  159. if atomic.CompareAndSwapInt32(closed, 0, 1) {
  160. close(done)
  161. }
  162. }
  163. defer close()
  164. routine.Submit(map[string]string{
  165. "module": "service",
  166. "function": "Endpoint",
  167. }, func() {
  168. defer close()
  169. for response.Next() {
  170. chunk, err := response.Read()
  171. if err != nil {
  172. ctx.Writer.Write([]byte(err.Error()))
  173. ctx.Writer.Flush()
  174. return
  175. }
  176. ctx.Writer.Write(chunk)
  177. ctx.Writer.Flush()
  178. }
  179. })
  180. select {
  181. case <-ctx.Writer.CloseNotify():
  182. case <-done:
  183. case <-time.After(maxExecutionTime):
  184. ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
  185. }
  186. }
  187. func EnableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  188. if err := install_service.EnabledEndpoint(endpoint_id, tenant_id); err != nil {
  189. return exception.InternalServerError(errors.New("failed to enable endpoint")).ToResponse()
  190. }
  191. return entities.NewSuccessResponse(true)
  192. }
  193. func DisableEndpoint(endpoint_id string, tenant_id string) *entities.Response {
  194. if err := install_service.DisabledEndpoint(endpoint_id, tenant_id); err != nil {
  195. return exception.InternalServerError(errors.New("failed to disable endpoint")).ToResponse()
  196. }
  197. return entities.NewSuccessResponse(true)
  198. }
  199. func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response {
  200. endpoints, err := db.GetAll[models.Endpoint](
  201. db.Equal("tenant_id", tenant_id),
  202. db.OrderBy("created_at", true),
  203. db.Page(page, page_size),
  204. )
  205. if err != nil {
  206. return exception.InternalServerError(fmt.Errorf("failed to list endpoints: %v", err)).ToResponse()
  207. }
  208. manager := plugin_manager.Manager()
  209. if manager == nil {
  210. return exception.InternalServerError(errors.New("failed to get plugin manager")).ToResponse()
  211. }
  212. // decrypt settings
  213. for i, endpoint := range endpoints {
  214. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  215. db.Equal("plugin_id", endpoint.PluginID),
  216. db.Equal("tenant_id", tenant_id),
  217. )
  218. if err != nil {
  219. // use empty settings and declaration for uninstalled plugins
  220. endpoint.Settings = map[string]any{}
  221. endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
  222. Settings: []plugin_entities.ProviderConfig{},
  223. Endpoints: []plugin_entities.EndpointDeclaration{},
  224. EndpointFiles: []string{},
  225. }
  226. endpoints[i] = endpoint
  227. continue
  228. }
  229. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  230. pluginInstallation.PluginUniqueIdentifier,
  231. )
  232. if err != nil {
  233. return exception.UniqueIdentifierError(
  234. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  235. ).ToResponse()
  236. }
  237. pluginDeclaration, err := manager.GetDeclaration(
  238. pluginUniqueIdentifier,
  239. tenant_id,
  240. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  241. )
  242. if err != nil {
  243. return exception.InternalServerError(
  244. fmt.Errorf("failed to get plugin declaration: %v", err),
  245. ).ToResponse()
  246. }
  247. if pluginDeclaration.Endpoint == nil {
  248. return exception.NotFoundError(errors.New("plugin does not have an endpoint")).ToResponse()
  249. }
  250. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  251. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  252. TenantId: tenant_id,
  253. UserId: "",
  254. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  255. },
  256. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  257. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  258. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  259. Identity: endpoint.ID,
  260. Data: endpoint.Settings,
  261. Config: pluginDeclaration.Endpoint.Settings,
  262. },
  263. })
  264. if err != nil {
  265. return exception.InternalServerError(
  266. fmt.Errorf("failed to decrypt settings: %v", err),
  267. ).ToResponse()
  268. }
  269. // mask settings
  270. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  271. endpoint.Settings = decryptedSettings
  272. endpoint.Declaration = pluginDeclaration.Endpoint
  273. endpoints[i] = endpoint
  274. }
  275. return entities.NewSuccessResponse(endpoints)
  276. }
  277. func ListPluginEndpoints(tenant_id string, plugin_id string, page int, page_size int) *entities.Response {
  278. endpoints, err := db.GetAll[models.Endpoint](
  279. db.Equal("plugin_id", plugin_id),
  280. db.Equal("tenant_id", tenant_id),
  281. db.OrderBy("created_at", true),
  282. db.Page(page, page_size),
  283. )
  284. if err != nil {
  285. return exception.InternalServerError(
  286. fmt.Errorf("failed to list endpoints: %v", err),
  287. ).ToResponse()
  288. }
  289. manager := plugin_manager.Manager()
  290. if manager == nil {
  291. return exception.InternalServerError(
  292. errors.New("failed to get plugin manager"),
  293. ).ToResponse()
  294. }
  295. // decrypt settings
  296. for i, endpoint := range endpoints {
  297. // get installation
  298. pluginInstallation, err := db.GetOne[models.PluginInstallation](
  299. db.Equal("plugin_id", plugin_id),
  300. db.Equal("tenant_id", tenant_id),
  301. )
  302. if err != nil {
  303. return exception.NotFoundError(
  304. fmt.Errorf("failed to find plugin installation: %v", err),
  305. ).ToResponse()
  306. }
  307. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  308. pluginInstallation.PluginUniqueIdentifier,
  309. )
  310. if err != nil {
  311. return exception.UniqueIdentifierError(
  312. fmt.Errorf("failed to parse plugin unique identifier: %v", err),
  313. ).ToResponse()
  314. }
  315. pluginDeclaration, err := manager.GetDeclaration(
  316. pluginUniqueIdentifier,
  317. tenant_id,
  318. plugin_entities.PluginRuntimeType(pluginInstallation.RuntimeType),
  319. )
  320. if err != nil {
  321. return exception.InternalServerError(
  322. fmt.Errorf("failed to get plugin declaration: %v", err),
  323. ).ToResponse()
  324. }
  325. decryptedSettings, err := manager.BackwardsInvocation().InvokeEncrypt(&dify_invocation.InvokeEncryptRequest{
  326. BaseInvokeDifyRequest: dify_invocation.BaseInvokeDifyRequest{
  327. TenantId: tenant_id,
  328. UserId: "",
  329. Type: dify_invocation.INVOKE_TYPE_ENCRYPT,
  330. },
  331. InvokeEncryptSchema: dify_invocation.InvokeEncryptSchema{
  332. Opt: dify_invocation.ENCRYPT_OPT_DECRYPT,
  333. Namespace: dify_invocation.ENCRYPT_NAMESPACE_ENDPOINT,
  334. Identity: endpoint.ID,
  335. Data: endpoint.Settings,
  336. Config: pluginDeclaration.Endpoint.Settings,
  337. },
  338. })
  339. if err != nil {
  340. return exception.InternalServerError(
  341. fmt.Errorf("failed to decrypt settings: %v", err),
  342. ).ToResponse()
  343. }
  344. // mask settings
  345. decryptedSettings = encryption.MaskConfigCredentials(decryptedSettings, pluginDeclaration.Endpoint.Settings)
  346. endpoint.Settings = decryptedSettings
  347. endpoint.Declaration = pluginDeclaration.Endpoint
  348. endpoints[i] = endpoint
  349. }
  350. return entities.NewSuccessResponse(endpoints)
  351. }