endpoint.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package server
  2. import (
  3. "errors"
  4. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  5. "strings"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. "github.com/langgenius/dify-plugin-daemon/internal/db"
  9. "github.com/langgenius/dify-plugin-daemon/internal/service"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  15. )
  16. // DifyPlugin supports register and use endpoint to improve the plugin's functionality
  17. // you can use it to do some magics, looking forward to your imagination, Ciallo~(∠·ω< )⌒
  18. // - Yeuoly
  19. // EndpointHandler is a function type that can be used to handle endpoint requests
  20. type EndpointHandler func(ctx *gin.Context, hookId string, maxExecutionTime time.Duration, path string)
  21. func (app *App) Endpoint(config *app.Config) func(c *gin.Context) {
  22. return func(c *gin.Context) {
  23. hookId := c.Param("hook_id")
  24. path := c.Param("path")
  25. if app.endpointHandler != nil {
  26. app.endpointHandler(c, hookId, time.Duration(config.PluginMaxExecutionTimeout)*time.Second, path)
  27. } else {
  28. app.EndpointHandler(c, hookId, time.Duration(config.PluginMaxExecutionTimeout)*time.Second, path)
  29. }
  30. }
  31. }
  32. func (app *App) EndpointHandler(ctx *gin.Context, hookId string, maxExecutionTime time.Duration, path string) {
  33. endpointCacheKey := strings.Join(
  34. []string{
  35. "hook_id",
  36. hookId,
  37. },
  38. ":",
  39. )
  40. endpoint, err := cache.AutoGetWithGetter[models.Endpoint](
  41. endpointCacheKey,
  42. func() (*models.Endpoint, error) {
  43. v, err := db.GetOne[models.Endpoint](
  44. db.Equal("hook_id", hookId),
  45. )
  46. return &v, err
  47. })
  48. if err == db.ErrDatabaseNotFound {
  49. ctx.JSON(404, exception.BadRequestError(errors.New("endpoint not found")).ToResponse())
  50. return
  51. }
  52. if err != nil {
  53. log.Error("get endpoint error %v", err)
  54. ctx.JSON(500, exception.InternalServerError(errors.New("internal server error")).ToResponse())
  55. return
  56. }
  57. // get plugin installation
  58. pluginInstallationCacheKey := strings.Join(
  59. []string{
  60. "plugin_id",
  61. endpoint.PluginID,
  62. "tenant_id",
  63. endpoint.TenantID,
  64. },
  65. ":",
  66. )
  67. pluginInstallation, err := cache.AutoGetWithGetter[models.PluginInstallation](
  68. pluginInstallationCacheKey,
  69. func() (*models.PluginInstallation, error) {
  70. v, err := db.GetOne[models.PluginInstallation](
  71. db.Equal("plugin_id", endpoint.PluginID),
  72. db.Equal("tenant_id", endpoint.TenantID),
  73. )
  74. return &v, err
  75. },
  76. )
  77. if err != nil {
  78. ctx.JSON(404, exception.BadRequestError(errors.New("plugin installation not found")).ToResponse())
  79. return
  80. }
  81. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(
  82. pluginInstallation.PluginUniqueIdentifier,
  83. )
  84. if err != nil {
  85. ctx.JSON(400, exception.UniqueIdentifierError(
  86. errors.New("invalid plugin unique identifier"),
  87. ).ToResponse())
  88. return
  89. }
  90. // check if plugin exists in current node
  91. if ok, originalError := app.cluster.IsPluginOnCurrentNode(pluginUniqueIdentifier); !ok {
  92. app.redirectPluginInvokeByPluginIdentifier(ctx, pluginUniqueIdentifier, originalError)
  93. } else {
  94. service.Endpoint(ctx, endpoint, pluginInstallation, maxExecutionTime, path)
  95. }
  96. }