invoke_tool.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package service
  2. import (
  3. "errors"
  4. "github.com/gin-gonic/gin"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  13. )
  14. func createSession[T any](
  15. r *plugin_entities.InvokePluginRequest[T],
  16. access_type access_types.PluginAccessType,
  17. access_action access_types.PluginAccessAction,
  18. cluster_id string,
  19. ) (*session_manager.Session, error) {
  20. manager := plugin_manager.Manager()
  21. if manager == nil {
  22. return nil, errors.New("failed to get plugin manager")
  23. }
  24. // try fetch plugin identifier from plugin id
  25. runtime := manager.Get(r.UniqueIdentifier)
  26. if runtime == nil {
  27. return nil, errors.New("failed to get plugin runtime")
  28. }
  29. session := session_manager.NewSession(
  30. session_manager.NewSessionPayload{
  31. TenantID: r.TenantId,
  32. UserID: r.UserId,
  33. PluginUniqueIdentifier: r.UniqueIdentifier,
  34. ClusterID: cluster_id,
  35. InvokeFrom: access_type,
  36. Action: access_action,
  37. Declaration: runtime.Configuration(),
  38. BackwardsInvocation: manager.BackwardsInvocation(),
  39. IgnoreCache: false,
  40. },
  41. )
  42. session.BindRuntime(runtime)
  43. return session, nil
  44. }
  45. func InvokeTool(
  46. r *plugin_entities.InvokePluginRequest[requests.RequestInvokeTool],
  47. ctx *gin.Context,
  48. max_timeout_seconds int,
  49. ) {
  50. // create session
  51. session, err := createSession(
  52. r,
  53. access_types.PLUGIN_ACCESS_TYPE_TOOL,
  54. access_types.PLUGIN_ACCESS_ACTION_INVOKE_TOOL,
  55. ctx.GetString("cluster_id"),
  56. )
  57. if err != nil {
  58. ctx.JSON(500, gin.H{"error": err.Error()})
  59. return
  60. }
  61. defer session.Close(session_manager.CloseSessionPayload{
  62. IgnoreCache: false,
  63. })
  64. baseSSEService(
  65. func() (*stream.Stream[tool_entities.ToolResponseChunk], error) {
  66. return plugin_daemon.InvokeTool(session, &r.Data)
  67. },
  68. ctx,
  69. max_timeout_seconds,
  70. )
  71. }
  72. func ValidateToolCredentials(
  73. r *plugin_entities.InvokePluginRequest[requests.RequestValidateToolCredentials],
  74. ctx *gin.Context,
  75. max_timeout_seconds int,
  76. ) {
  77. // create session
  78. session, err := createSession(
  79. r,
  80. access_types.PLUGIN_ACCESS_TYPE_TOOL,
  81. access_types.PLUGIN_ACCESS_ACTION_VALIDATE_TOOL_CREDENTIALS,
  82. ctx.GetString("cluster_id"),
  83. )
  84. if err != nil {
  85. ctx.JSON(500, gin.H{"error": err.Error()})
  86. return
  87. }
  88. defer session.Close(session_manager.CloseSessionPayload{
  89. IgnoreCache: false,
  90. })
  91. baseSSEService(
  92. func() (*stream.Stream[tool_entities.ValidateCredentialsResult], error) {
  93. return plugin_daemon.ValidateToolCredentials(session, &r.Data)
  94. },
  95. ctx,
  96. max_timeout_seconds,
  97. )
  98. }