Browse Source

feat: add aws transaction handler

Yeuoly 11 months ago
parent
commit
052c1632eb

+ 10 - 0
internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_handler.go

@@ -1,4 +1,14 @@
 package transaction
 package transaction
 
 
+import "github.com/gin-gonic/gin"
+
 type AWSEventHandler struct {
 type AWSEventHandler struct {
 }
 }
+
+func NewAWSEventHandler() *AWSEventHandler {
+	return &AWSEventHandler{}
+}
+
+func (h *AWSEventHandler) Handle(ctx *gin.Context) {
+
+}

+ 9 - 0
internal/server/app.go

@@ -2,10 +2,19 @@ package server
 
 
 import (
 import (
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
 )
 )
 
 
 type App struct {
 type App struct {
+	// cluster instance of this node
+	// schedule all the tasks related to the cluster, like request direct
 	cluster *cluster.Cluster
 	cluster *cluster.Cluster
 
 
+	// webhook handler
+	// customize behavior of endpoint
 	webhook_handler WebhookHandler
 	webhook_handler WebhookHandler
+
+	// aws transaction handler
+	// accept aws transaction request and forward to the plugin daemon
+	aws_transaction_handler *transaction.AWSEventHandler
 }
 }

+ 9 - 0
internal/server/http_server.go

@@ -6,6 +6,7 @@ import (
 	"net/http"
 	"net/http"
 
 
 	"github.com/gin-gonic/gin"
 	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
 	"github.com/langgenius/dify-plugin-daemon/internal/server/controllers"
 	"github.com/langgenius/dify-plugin-daemon/internal/server/controllers"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -18,6 +19,7 @@ func (app *App) server(config *app.Config) func() {
 	app.pluginInvokeGroup(engine.Group("/plugin"), config)
 	app.pluginInvokeGroup(engine.Group("/plugin"), config)
 	app.remoteDebuggingGroup(engine.Group("/plugin/debugging"), config)
 	app.remoteDebuggingGroup(engine.Group("/plugin/debugging"), config)
 	app.webhookGroup(engine.Group("/webhook"), config)
 	app.webhookGroup(engine.Group("/webhook"), config)
+	app.awsLambdaTransactionGroup(engine.Group("/aws-lambda"), config)
 
 
 	srv := &http.Server{
 	srv := &http.Server{
 		Addr:    fmt.Sprintf(":%d", config.ServerPort),
 		Addr:    fmt.Sprintf(":%d", config.ServerPort),
@@ -70,3 +72,10 @@ func (app *App) webhookGroup(group *gin.RouterGroup, config *app.Config) {
 		group.OPTIONS("/:hook_id/*path", app.Webhook())
 		group.OPTIONS("/:hook_id/*path", app.Webhook())
 	}
 	}
 }
 }
+
+func (appRef *App) awsLambdaTransactionGroup(group *gin.RouterGroup, config *app.Config) {
+	if config.Platform == app.PLATFORM_AWS_LAMBDA {
+		appRef.aws_transaction_handler = transaction.NewAWSEventHandler()
+		group.POST("/transaction", appRef.RedirectAWSLambdaTransaction, appRef.aws_transaction_handler.Handle)
+	}
+}

+ 12 - 2
internal/server/middleware.go

@@ -62,7 +62,7 @@ func (app *App) RedirectPluginInvoke() gin.HandlerFunc {
 		if !app.cluster.IsPluginNoCurrentNode(
 		if !app.cluster.IsPluginNoCurrentNode(
 			plugin_id,
 			plugin_id,
 		) {
 		) {
-			app.Redirect(ctx, plugin_id)
+			app.redirectPluginInvokeByPluginID(ctx, plugin_id)
 			ctx.Abort()
 			ctx.Abort()
 		} else {
 		} else {
 			ctx.Next()
 			ctx.Next()
@@ -70,7 +70,7 @@ func (app *App) RedirectPluginInvoke() gin.HandlerFunc {
 	}
 	}
 }
 }
 
 
-func (app *App) Redirect(ctx *gin.Context, plugin_id string) {
+func (app *App) redirectPluginInvokeByPluginID(ctx *gin.Context, plugin_id string) {
 	// try find the correct node
 	// try find the correct node
 	nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_id)
 	nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_id)
 	if err != nil {
 	if err != nil {
@@ -118,6 +118,16 @@ func (app *App) Redirect(ctx *gin.Context, plugin_id string) {
 	}
 	}
 }
 }
 
 
+func (app *App) RedirectAWSLambdaTransaction(ctx *gin.Context) {
+	session_id := ctx.GetString("session_id")
+	if session_id == "" {
+		ctx.AbortWithStatusJSON(404, gin.H{"error": "Session not found"})
+		return
+	}
+
+	// TODO: check if session_id is valid
+}
+
 func (app *App) InitClusterID() gin.HandlerFunc {
 func (app *App) InitClusterID() gin.HandlerFunc {
 	return func(ctx *gin.Context) {
 	return func(ctx *gin.Context) {
 		ctx.Set("cluster_id", app.cluster.ID())
 		ctx.Set("cluster_id", app.cluster.ID())

+ 1 - 1
internal/server/webhook.go

@@ -46,7 +46,7 @@ func (app *App) WebhookHandler(ctx *gin.Context, hook_id string, path string) {
 
 
 	// check if plugin exists in current node
 	// check if plugin exists in current node
 	if !app.cluster.IsPluginNoCurrentNode(webhook.PluginID) {
 	if !app.cluster.IsPluginNoCurrentNode(webhook.PluginID) {
-		app.Redirect(ctx, webhook.PluginID)
+		app.redirectPluginInvokeByPluginID(ctx, webhook.PluginID)
 	} else {
 	} else {
 		service.Webhook(ctx, &webhook, path)
 		service.Webhook(ctx, &webhook, path)
 	}
 	}