Browse Source

feat: invoking webhook

Yeuoly 11 months ago
parent
commit
82eaebd838

+ 4 - 2
internal/core/plugin_daemon/backwards_invocation/basic.go

@@ -3,8 +3,9 @@ package backwards_invocation
 type PluginAccessType string
 
 const (
-	PLUGIN_ACCESS_TYPE_TOOL  PluginAccessType = "tool"
-	PLUGIN_ACCESS_TYPE_MODEL PluginAccessType = "model"
+	PLUGIN_ACCESS_TYPE_TOOL    PluginAccessType = "tool"
+	PLUGIN_ACCESS_TYPE_MODEL   PluginAccessType = "model"
+	PLUGIN_ACCESS_TYPE_WEBHOOK PluginAccessType = "webhook"
 )
 
 type PluginAccessAction string
@@ -20,4 +21,5 @@ const (
 	PLUGIN_ACCESS_ACTION_INVOKE_MODERATION             PluginAccessAction = "invoke_moderation"
 	PLUGIN_ACCESS_ACTION_VALIDATE_PROVIDER_CREDENTIALS PluginAccessAction = "validate_provider_credentials"
 	PLUGIN_ACCESS_ACTION_VALIDATE_MODEL_CREDENTIALS    PluginAccessAction = "validate_model_credentials"
+	PLUGIN_ACCESS_ACTION_WEBHOOK                       PluginAccessAction = "webhook"
 )

+ 93 - 0
internal/core/plugin_daemon/generic.go

@@ -0,0 +1,93 @@
+package plugin_daemon
+
+import (
+	"errors"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+func genericInvokePlugin[Req any, Rsp any](
+	session *session_manager.Session,
+	request *Req,
+	response_buffer_size int,
+	typ backwards_invocation.PluginAccessType,
+	action backwards_invocation.PluginAccessAction,
+) (*stream.StreamResponse[Rsp], error) {
+	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
+	if runtime == nil {
+		return nil, errors.New("plugin not found")
+	}
+
+	response := stream.NewStreamResponse[Rsp](response_buffer_size)
+
+	listener := runtime.Listen(session.ID())
+	listener.AddListener(func(message []byte) {
+		chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
+		if err != nil {
+			log.Error("unmarshal json failed: %s", err.Error())
+			return
+		}
+
+		switch chunk.Type {
+		case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
+			chunk, err := parser.UnmarshalJsonBytes[Rsp](chunk.Data)
+			if err != nil {
+				log.Error("unmarshal json failed: %s", err.Error())
+				return
+			}
+			response.Write(chunk)
+		case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
+			if err := backwards_invocation.InvokeDify(runtime, typ, session, chunk.Data); err != nil {
+				log.Error("invoke dify failed: %s", err.Error())
+				return
+			}
+		case plugin_entities.SESSION_MESSAGE_TYPE_END:
+			response.Close()
+		case plugin_entities.SESSION_MESSAGE_TYPE_ERROR:
+			e, err := parser.UnmarshalJsonBytes[plugin_entities.ErrorResponse](chunk.Data)
+			if err != nil {
+				break
+			}
+			response.WriteError(errors.New(e.Error))
+			response.Close()
+		default:
+			response.WriteError(errors.New("unknown stream message type: " + string(chunk.Type)))
+			response.Close()
+		}
+	})
+
+	response.OnClose(func() {
+		listener.Close()
+	})
+
+	session.Write(
+		session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
+		getInvokePluginMap(
+			session,
+			typ,
+			action,
+			request,
+		),
+	)
+
+	return response, nil
+}
+
+func getInvokePluginMap(
+	session *session_manager.Session,
+	typ backwards_invocation.PluginAccessType,
+	action backwards_invocation.PluginAccessAction,
+	request any,
+) map[string]any {
+	req := getBasicPluginAccessMap(session.UserID(), typ, action)
+	for k, v := range parser.StructToMap(request) {
+		req[k] = v
+	}
+	return req
+}

+ 0 - 1
internal/core/plugin_daemon/model.go

@@ -1 +0,0 @@
-package plugin_daemon

+ 0 - 86
internal/core/plugin_daemon/model_service.go

@@ -1,99 +1,13 @@
 package plugin_daemon
 
 import (
-	"errors"
-
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func genericInvokePlugin[Req any, Rsp any](
-	session *session_manager.Session,
-	request *Req,
-	response_buffer_size int,
-	typ backwards_invocation.PluginAccessType,
-	action backwards_invocation.PluginAccessAction,
-) (*stream.StreamResponse[Rsp], error) {
-	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
-	if runtime == nil {
-		return nil, errors.New("plugin not found")
-	}
-
-	response := stream.NewStreamResponse[Rsp](response_buffer_size)
-
-	listener := runtime.Listen(session.ID())
-	listener.AddListener(func(message []byte) {
-		chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
-		if err != nil {
-			log.Error("unmarshal json failed: %s", err.Error())
-			return
-		}
-
-		switch chunk.Type {
-		case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
-			chunk, err := parser.UnmarshalJsonBytes[Rsp](chunk.Data)
-			if err != nil {
-				log.Error("unmarshal json failed: %s", err.Error())
-				return
-			}
-			response.Write(chunk)
-		case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
-			if err := backwards_invocation.InvokeDify(runtime, typ, session, chunk.Data); err != nil {
-				log.Error("invoke dify failed: %s", err.Error())
-				return
-			}
-		case plugin_entities.SESSION_MESSAGE_TYPE_END:
-			response.Close()
-		case plugin_entities.SESSION_MESSAGE_TYPE_ERROR:
-			e, err := parser.UnmarshalJsonBytes[plugin_entities.ErrorResponse](chunk.Data)
-			if err != nil {
-				break
-			}
-			response.WriteError(errors.New(e.Error))
-			response.Close()
-		default:
-			response.WriteError(errors.New("unknown stream message type: " + string(chunk.Type)))
-			response.Close()
-		}
-	})
-
-	response.OnClose(func() {
-		listener.Close()
-	})
-
-	session.Write(
-		session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
-		getInvokeModelMap(
-			session,
-			typ,
-			action,
-			request,
-		),
-	)
-
-	return response, nil
-}
-
-func getInvokeModelMap(
-	session *session_manager.Session,
-	typ backwards_invocation.PluginAccessType,
-	action backwards_invocation.PluginAccessAction,
-	request any,
-) map[string]any {
-	req := getBasicPluginAccessMap(session.UserID(), typ, action)
-	for k, v := range parser.StructToMap(request) {
-		req[k] = v
-	}
-	return req
-}
-
 func InvokeLLM(
 	session *session_manager.Session,
 	request *requests.RequestInvokeLLM,

+ 0 - 1
internal/core/plugin_daemon/tool.go

@@ -1 +0,0 @@
-package plugin_daemon

+ 17 - 0
internal/core/plugin_daemon/webhook_service.go

@@ -0,0 +1,17 @@
+package plugin_daemon
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+func InvokeWebhook(
+	session *session_manager.Session,
+	request *requests.RequestInvokeWebhook,
+) (
+	*stream.StreamResponse[[]byte], error,
+) {
+	// TODO: implement this function
+	return nil, nil
+}

+ 1 - 1
internal/server/webhook.go

@@ -9,7 +9,7 @@ import (
 )
 
 // DifyPlugin supports register and use webhook to improve the plugin's functionality
-// you can use it to do some magic things, look forward to your imagination Ciallo~(∠·ω< )⌒
+// you can use it to do some magics, looking forward to your imagination, Ciallo~(∠·ω< )⌒
 // - Yeuoly
 
 // WebhookHandler is a function type that can be used to handle webhook requests

+ 14 - 0
internal/service/webhook.go

@@ -4,6 +4,8 @@ import (
 	"bytes"
 
 	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
 )
 
@@ -18,5 +20,17 @@ func Webhook(ctx *gin.Context, webhook *models.Webhook, path string) {
 	}
 
 	// fetch plugin
+	manager := plugin_manager.GetGlobalPluginManager()
+	runtime := manager.Get(webhook.PluginID)
+	if runtime == nil {
+		ctx.JSON(404, gin.H{"error": "plugin not found"})
+		return
+	}
+
+	session := session_manager.NewSession(webhook.TenantID, "", webhook.PluginID)
+	defer session.Close()
+
+	session.BindRuntime(runtime)
 
+	// TODO: handle webhook
 }

+ 4 - 0
internal/types/entities/requests/webhook.go

@@ -0,0 +1,4 @@
+package requests
+
+type RequestInvokeWebhook struct {
+}