Browse Source

feat: redirects webhooks

Yeuoly 11 months ago
parent
commit
ef091e4fed

+ 3 - 0
.env.example

@@ -4,10 +4,13 @@ PLUGIN_INNER_API_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi
 PLUGIN_INNER_API_URL=http://127.0.0.1:5001
 
 SERVER_PORT=5002
+GIN_MODE=debug
 
 PLUGIN_REMOTE_INSTALLING_HOST=127.0.0.1
 PLUGIN_REMOTE_INSTALLING_PORT=5003
 
+PLUGIN_WEBHOOK_ENABLED=true
+
 ROUTINE_POOL_SIZE=1024
 
 REDIS_HOST=127.0.0.1

+ 1 - 0
internal/db/init.go

@@ -80,6 +80,7 @@ func autoMigrate() error {
 	return DifyPluginDB.AutoMigrate(
 		models.Plugin{},
 		models.PluginInstallation{},
+		models.Webhook{},
 	)
 }
 

+ 49 - 47
internal/server/middleware.go

@@ -62,56 +62,58 @@ func (app *App) RedirectPluginInvoke() gin.HandlerFunc {
 		if !app.cluster.IsPluginNoCurrentNode(
 			plugin_id,
 		) {
-			// try find the correct node
-			nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_id)
-			if err != nil {
-				ctx.AbortWithStatusJSON(500, gin.H{"error": "Internal server error"})
-				log.Error("fetch plugin available nodes failed: %s", err.Error())
-				return
-			} else if len(nodes) == 0 {
-				ctx.AbortWithStatusJSON(404, gin.H{"error": "No available node"})
-				log.Error("no available node")
-				return
-			}
-
-			// redirect to the correct node
-			node_id := nodes[0]
-			status_code, header, body, err := app.cluster.RedirectRequest(node_id, ctx.Request)
-			if err != nil {
-				log.Error("redirect request failed: %s", err.Error())
-				ctx.AbortWithStatusJSON(500, gin.H{"error": "Internal server error"})
-				return
-			}
-
-			// set status code
-			ctx.Writer.WriteHeader(status_code)
-
-			// set header
-			for key, values := range header {
-				for _, value := range values {
-					ctx.Writer.Header().Set(key, value)
-				}
-			}
-
-			for {
-				buf := make([]byte, 1024)
-				n, err := body.Read(buf)
-				if err != nil && err != io.EOF {
-					break
-				} else if err != nil {
-					ctx.Writer.Write(buf[:n])
-					break
-				}
-
-				if n > 0 {
-					ctx.Writer.Write(buf[:n])
-				}
-			}
-
+			app.Redirect(ctx, plugin_id)
 			ctx.Abort()
-			return
 		} else {
 			ctx.Next()
 		}
 	}
 }
+
+func (app *App) Redirect(ctx *gin.Context, plugin_id string) {
+	// try find the correct node
+	nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_id)
+	if err != nil {
+		ctx.AbortWithStatusJSON(500, gin.H{"error": "Internal server error"})
+		log.Error("fetch plugin available nodes failed: %s", err.Error())
+		return
+	} else if len(nodes) == 0 {
+		ctx.AbortWithStatusJSON(404, gin.H{"error": "No available node"})
+		log.Error("no available node")
+		return
+	}
+
+	// redirect to the correct node
+	node_id := nodes[0]
+	status_code, header, body, err := app.cluster.RedirectRequest(node_id, ctx.Request)
+	if err != nil {
+		log.Error("redirect request failed: %s", err.Error())
+		ctx.AbortWithStatusJSON(500, gin.H{"error": "Internal server error"})
+		return
+	}
+
+	// set status code
+	ctx.Writer.WriteHeader(status_code)
+
+	// set header
+	for key, values := range header {
+		for _, value := range values {
+			ctx.Writer.Header().Set(key, value)
+		}
+	}
+
+	for {
+		buf := make([]byte, 1024)
+		n, err := body.Read(buf)
+		if err != nil && err != io.EOF {
+			break
+		} else if err != nil {
+			ctx.Writer.Write(buf[:n])
+			break
+		}
+
+		if n > 0 {
+			ctx.Writer.Write(buf[:n])
+		}
+	}
+}

+ 29 - 8
internal/server/webhook.go

@@ -1,9 +1,11 @@
 package server
 
 import (
-	"fmt"
-
 	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/db"
+	"github.com/langgenius/dify-plugin-daemon/internal/service"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
 // DifyPlugin supports register and use webhook to improve the plugin's functionality
@@ -11,7 +13,7 @@ import (
 // - Yeuoly
 
 // WebhookHandler is a function type that can be used to handle webhook requests
-type WebhookHandler func(hook_id string, path string)
+type WebhookHandler func(ctx *gin.Context, hook_id string, path string)
 
 func (app *App) Webhook() func(c *gin.Context) {
 	return func(c *gin.Context) {
@@ -19,14 +21,33 @@ func (app *App) Webhook() func(c *gin.Context) {
 		path := c.Param("path")
 
 		if app.webhook_handler != nil {
-			app.webhook_handler(hook_id, path)
+			app.webhook_handler(c, hook_id, path)
 		} else {
-			app.WebhookHandler(hook_id, path)
+			app.WebhookHandler(c, hook_id, path)
 		}
 	}
 }
 
-func (app *App) WebhookHandler(hook_id string, path string) {
-	fmt.Println(hook_id)
-	fmt.Println(path)
+func (app *App) WebhookHandler(ctx *gin.Context, hook_id string, path string) {
+	webhook, err := db.GetOne[models.Webhook](
+		db.Equal("hook_id", hook_id),
+	)
+
+	if err == db.ErrDatabaseNotFound {
+		ctx.JSON(404, gin.H{"error": "webhook not found"})
+		return
+	}
+
+	if err != nil {
+		log.Error("get webhook error %v", err)
+		ctx.JSON(500, gin.H{"error": "internal server error"})
+		return
+	}
+
+	// check if plugin exists in current node
+	if !app.cluster.IsPluginNoCurrentNode(webhook.PluginID) {
+		app.Redirect(ctx, webhook.PluginID)
+	} else {
+		service.Webhook(ctx, &webhook, path)
+	}
 }

+ 2 - 1
internal/server/webhook_test.go

@@ -5,6 +5,7 @@ import (
 	"strconv"
 	"testing"
 
+	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/network"
 )
@@ -19,7 +20,7 @@ func TestWebhookParams(t *testing.T) {
 	global_hook_id := ""
 	global_hook_path := ""
 
-	handler := func(hook_id string, path string) {
+	handler := func(ctx *gin.Context, hook_id string, path string) {
 		global_hook_id = hook_id
 		global_hook_path = path
 	}

+ 21 - 0
internal/service/webhook.go

@@ -1 +1,22 @@
 package service
+
+import (
+	"bytes"
+
+	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
+)
+
+func Webhook(ctx *gin.Context, webhook *models.Webhook, path string) {
+	req := ctx.Request
+
+	var buffer bytes.Buffer
+	err := req.Write(&buffer)
+
+	if err != nil {
+		ctx.JSON(500, gin.H{"error": err.Error()})
+	}
+
+	// fetch plugin
+
+}

+ 13 - 0
internal/types/models/webhook.go

@@ -0,0 +1,13 @@
+package models
+
+import "time"
+
+// HookID is a pointer to plugin id and tenant id, using it to identify the webhook plugin
+type Webhook struct {
+	Model
+	HookID    string    `json:"hook_id" orm:"index;size:127;column:hook_id"`
+	TenantID  string    `json:"tenant_id" orm:"index;size:64;column:tenant_id"`
+	UserID    string    `json:"user_id" orm:"index;size:64;column:user_id"`
+	PluginID  string    `json:"plugin_id" orm:"index;size:64;column:plugin_id"`
+	ExpiredAt time.Time `json:"expired_at" orm:"column:expired_at"`
+}