123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- package server
- import (
- "errors"
- "io"
- "github.com/gin-gonic/gin"
- "github.com/langgenius/dify-plugin-daemon/internal/db"
- "github.com/langgenius/dify-plugin-daemon/internal/server/constants"
- "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
- "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
- "github.com/langgenius/dify-plugin-daemon/internal/types/models"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
- )
- func CheckingKey(key string) gin.HandlerFunc {
- return func(c *gin.Context) {
- // get header X-Api-Key
- if c.GetHeader(constants.X_API_KEY) != key {
- c.AbortWithStatusJSON(401, exception.UnauthorizedError().ToResponse())
- return
- }
- c.Next()
- }
- }
- func (app *App) FetchPluginInstallation() gin.HandlerFunc {
- return func(ctx *gin.Context) {
- pluginId := ctx.Request.Header.Get(constants.X_PLUGIN_ID)
- if pluginId == "" {
- ctx.AbortWithStatusJSON(400, exception.BadRequestError(errors.New("plugin_id is required")).ToResponse())
- return
- }
- tenantId := ctx.Param("tenant_id")
- if tenantId == "" {
- ctx.AbortWithStatusJSON(400, exception.BadRequestError(errors.New("tenant_id is required")).ToResponse())
- return
- }
- // fetch plugin installation
- installation, err := db.GetOne[models.PluginInstallation](
- db.Equal("tenant_id", tenantId),
- db.Equal("plugin_id", pluginId),
- )
- if err == db.ErrDatabaseNotFound {
- ctx.AbortWithStatusJSON(404, exception.ErrPluginNotFound().ToResponse())
- return
- }
- if err != nil {
- ctx.AbortWithStatusJSON(500, exception.InternalServerError(err).ToResponse())
- return
- }
- identity, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
- if err != nil {
- ctx.AbortWithStatusJSON(400, exception.PluginUniqueIdentifierError(err).ToResponse())
- return
- }
- ctx.Set(constants.CONTEXT_KEY_PLUGIN_INSTALLATION, installation)
- ctx.Set(constants.CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER, identity)
- ctx.Next()
- }
- }
- // RedirectPluginInvoke redirects the request to the correct cluster node
- func (app *App) RedirectPluginInvoke() gin.HandlerFunc {
- return func(ctx *gin.Context) {
- // get plugin unique identifier
- identityAny, ok := ctx.Get(constants.CONTEXT_KEY_PLUGIN_UNIQUE_IDENTIFIER)
- if !ok {
- ctx.AbortWithStatusJSON(
- 500,
- exception.InternalServerError(errors.New("plugin unique identifier not found")).ToResponse(),
- )
- return
- }
- identity, ok := identityAny.(plugin_entities.PluginUniqueIdentifier)
- if !ok {
- ctx.AbortWithStatusJSON(
- 500,
- exception.InternalServerError(errors.New("failed to parse plugin unique identifier")).ToResponse(),
- )
- return
- }
- // check if plugin in current node
- if ok, originalError := app.cluster.IsPluginOnCurrentNode(identity); !ok {
- app.redirectPluginInvokeByPluginIdentifier(ctx, identity, originalError)
- ctx.Abort()
- } else {
- ctx.Next()
- }
- }
- }
- func (app *App) redirectPluginInvokeByPluginIdentifier(
- ctx *gin.Context,
- plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
- originalError error,
- ) {
- // try find the correct node
- nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_unique_identifier.String())
- if err != nil {
- ctx.AbortWithStatusJSON(
- 500,
- exception.InternalServerError(
- errors.New("failed to fetch plugin available nodes, "+originalError.Error()+", "+err.Error()),
- ).ToResponse(),
- )
- return
- } else if len(nodes) == 0 {
- ctx.AbortWithStatusJSON(
- 404,
- exception.InternalServerError(
- errors.New("no available node, "+originalError.Error()),
- ).ToResponse(),
- )
- return
- }
- // redirect to the correct node
- nodeId := nodes[0]
- statusCode, header, body, err := app.cluster.RedirectRequest(nodeId, ctx.Request)
- if err != nil {
- log.Error("redirect request failed: %s", err.Error())
- ctx.AbortWithStatusJSON(
- 500,
- exception.InternalServerError(errors.New("redirect request failed: "+err.Error())).ToResponse(),
- )
- return
- }
- // set status code
- ctx.Writer.WriteHeader(statusCode)
- // set header
- for key, values := range header {
- for _, value := range values {
- ctx.Writer.Header().Set(key, value)
- }
- }
- for {
- buf := make([]byte, 1024)
- n, err := body.Read(buf)
- if err != nil && err != io.EOF {
- break
- } else if err != nil {
- ctx.Writer.Write(buf[:n])
- break
- }
- if n > 0 {
- ctx.Writer.Write(buf[:n])
- }
- }
- }
- func (app *App) InitClusterID() gin.HandlerFunc {
- return func(ctx *gin.Context) {
- ctx.Set(constants.CONTEXT_KEY_CLUSTER_ID, app.cluster.ID())
- ctx.Next()
- }
- }
|