瀏覽代碼

feat: webhook service

Yeuoly 11 月之前
父節點
當前提交
27f58725b5

+ 25 - 0
internal/core/plugin_daemon/access_types/access.go

@@ -0,0 +1,25 @@
+package access_types
+
+type PluginAccessType string
+
+const (
+	PLUGIN_ACCESS_TYPE_TOOL    PluginAccessType = "tool"
+	PLUGIN_ACCESS_TYPE_MODEL   PluginAccessType = "model"
+	PLUGIN_ACCESS_TYPE_WEBHOOK PluginAccessType = "webhook"
+)
+
+type PluginAccessAction string
+
+const (
+	PLUGIN_ACCESS_ACTION_INVOKE_TOOL                   PluginAccessAction = "invoke_tool"
+	PLUGIN_ACCESS_ACTION_VALIDATE_TOOL_CREDENTIALS     PluginAccessAction = "validate_tool_credentials"
+	PLUGIN_ACCESS_ACTION_INVOKE_LLM                    PluginAccessAction = "invoke_llm"
+	PLUGIN_ACCESS_ACTION_INVOKE_TEXT_EMBEDDING         PluginAccessAction = "invoke_text_embedding"
+	PLUGIN_ACCESS_ACTION_INVOKE_RERANK                 PluginAccessAction = "invoke_rerank"
+	PLUGIN_ACCESS_ACTION_INVOKE_TTS                    PluginAccessAction = "invoke_tts"
+	PLUGIN_ACCESS_ACTION_INVOKE_SPEECH2TEXT            PluginAccessAction = "invoke_speech2text"
+	PLUGIN_ACCESS_ACTION_INVOKE_MODERATION             PluginAccessAction = "invoke_moderation"
+	PLUGIN_ACCESS_ACTION_VALIDATE_PROVIDER_CREDENTIALS PluginAccessAction = "validate_provider_credentials"
+	PLUGIN_ACCESS_ACTION_VALIDATE_MODEL_CREDENTIALS    PluginAccessAction = "validate_model_credentials"
+	PLUGIN_ACCESS_ACTION_WEBHOOK                       PluginAccessAction = "invoke_webhook"
+)

+ 0 - 24
internal/core/plugin_daemon/backwards_invocation/basic.go

@@ -1,25 +1 @@
 package backwards_invocation
-
-type PluginAccessType string
-
-const (
-	PLUGIN_ACCESS_TYPE_TOOL    PluginAccessType = "tool"
-	PLUGIN_ACCESS_TYPE_MODEL   PluginAccessType = "model"
-	PLUGIN_ACCESS_TYPE_WEBHOOK PluginAccessType = "webhook"
-)
-
-type PluginAccessAction string
-
-const (
-	PLUGIN_ACCESS_ACTION_INVOKE_TOOL                   PluginAccessAction = "invoke_tool"
-	PLUGIN_ACCESS_ACTION_VALIDATE_TOOL_CREDENTIALS     PluginAccessAction = "validate_tool_credentials"
-	PLUGIN_ACCESS_ACTION_INVOKE_LLM                    PluginAccessAction = "invoke_llm"
-	PLUGIN_ACCESS_ACTION_INVOKE_TEXT_EMBEDDING         PluginAccessAction = "invoke_text_embedding"
-	PLUGIN_ACCESS_ACTION_INVOKE_RERANK                 PluginAccessAction = "invoke_rerank"
-	PLUGIN_ACCESS_ACTION_INVOKE_TTS                    PluginAccessAction = "invoke_tts"
-	PLUGIN_ACCESS_ACTION_INVOKE_SPEECH2TEXT            PluginAccessAction = "invoke_speech2text"
-	PLUGIN_ACCESS_ACTION_INVOKE_MODERATION             PluginAccessAction = "invoke_moderation"
-	PLUGIN_ACCESS_ACTION_VALIDATE_PROVIDER_CREDENTIALS PluginAccessAction = "validate_provider_credentials"
-	PLUGIN_ACCESS_ACTION_VALIDATE_MODEL_CREDENTIALS    PluginAccessAction = "validate_model_credentials"
-	PLUGIN_ACCESS_ACTION_WEBHOOK                       PluginAccessAction = "webhook"
-)

+ 3 - 2
internal/core/plugin_daemon/backwards_invocation/task.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
@@ -14,7 +15,7 @@ import (
 
 func InvokeDify(
 	runtime entities.PluginRuntimeInterface,
-	invoke_from PluginAccessType,
+	invoke_from access_types.PluginAccessType,
 	session *session_manager.Session, data []byte,
 ) error {
 	// unmarshal invoke data
@@ -33,7 +34,7 @@ func InvokeDify(
 		return err
 	}
 
-	if invoke_from == PLUGIN_ACCESS_TYPE_MODEL {
+	if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
 		request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
 		request_handle.EndResponse()
 		return nil

+ 6 - 2
internal/core/plugin_daemon/basic.go

@@ -1,8 +1,12 @@
 package plugin_daemon
 
-import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 
-func getBasicPluginAccessMap(user_id string, access_type backwards_invocation.PluginAccessType, action backwards_invocation.PluginAccessAction) map[string]any {
+func getBasicPluginAccessMap(
+	user_id string,
+	access_type access_types.PluginAccessType,
+	action access_types.PluginAccessAction,
+) map[string]any {
 	return map[string]any{
 		"user_id": user_id,
 		"type":    access_type,

+ 5 - 4
internal/core/plugin_daemon/generic.go

@@ -3,6 +3,7 @@ package plugin_daemon
 import (
 	"errors"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
@@ -16,8 +17,8 @@ func genericInvokePlugin[Req any, Rsp any](
 	session *session_manager.Session,
 	request *Req,
 	response_buffer_size int,
-	typ backwards_invocation.PluginAccessType,
-	action backwards_invocation.PluginAccessAction,
+	typ access_types.PluginAccessType,
+	action access_types.PluginAccessAction,
 ) (*stream.StreamResponse[Rsp], error) {
 	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
 	if runtime == nil {
@@ -81,8 +82,8 @@ func genericInvokePlugin[Req any, Rsp any](
 
 func getInvokePluginMap(
 	session *session_manager.Session,
-	typ backwards_invocation.PluginAccessType,
-	action backwards_invocation.PluginAccessAction,
+	typ access_types.PluginAccessType,
+	action access_types.PluginAccessAction,
 	request any,
 ) map[string]any {
 	req := getBasicPluginAccessMap(session.UserID(), typ, action)

+ 17 - 17
internal/core/plugin_daemon/model_service.go

@@ -1,7 +1,7 @@
 package plugin_daemon
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
@@ -18,8 +18,8 @@ func InvokeLLM(
 		session,
 		request,
 		512,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_INVOKE_LLM,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_LLM,
 	)
 }
 
@@ -33,8 +33,8 @@ func InvokeTextEmbedding(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_INVOKE_TEXT_EMBEDDING,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TEXT_EMBEDDING,
 	)
 }
 
@@ -48,8 +48,8 @@ func InvokeRerank(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_INVOKE_RERANK,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_RERANK,
 	)
 }
 
@@ -63,8 +63,8 @@ func InvokeTTS(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_INVOKE_TTS,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TTS,
 	)
 }
 
@@ -78,8 +78,8 @@ func InvokeSpeech2Text(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_INVOKE_SPEECH2TEXT,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_SPEECH2TEXT,
 	)
 }
 
@@ -93,8 +93,8 @@ func InvokeModeration(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_INVOKE_MODERATION,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_MODERATION,
 	)
 }
 
@@ -108,8 +108,8 @@ func ValidateProviderCredentials(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_VALIDATE_PROVIDER_CREDENTIALS,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_PROVIDER_CREDENTIALS,
 	)
 }
 
@@ -123,7 +123,7 @@ func ValidateModelCredentials(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_MODEL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_VALIDATE_MODEL_CREDENTIALS,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_MODEL_CREDENTIALS,
 	)
 }

+ 5 - 5
internal/core/plugin_daemon/tool_service.go

@@ -1,7 +1,7 @@
 package plugin_daemon
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
@@ -18,8 +18,8 @@ func InvokeTool(
 		session,
 		request,
 		128,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_TOOL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_INVOKE_TOOL,
+		access_types.PLUGIN_ACCESS_TYPE_TOOL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TOOL,
 	)
 }
 
@@ -33,7 +33,7 @@ func ValidateToolCredentials(
 		session,
 		request,
 		1,
-		backwards_invocation.PLUGIN_ACCESS_TYPE_TOOL,
-		backwards_invocation.PLUGIN_ACCESS_ACTION_VALIDATE_TOOL_CREDENTIALS,
+		access_types.PLUGIN_ACCESS_TYPE_TOOL,
+		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_TOOL_CREDENTIALS,
 	)
 }

+ 70 - 3
internal/core/plugin_daemon/webhook_service.go

@@ -1,8 +1,14 @@
 package plugin_daemon
 
 import (
+	"encoding/hex"
+	"net/http"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/webhook_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
@@ -10,8 +16,69 @@ func InvokeWebhook(
 	session *session_manager.Session,
 	request *requests.RequestInvokeWebhook,
 ) (
-	*stream.StreamResponse[[]byte], error,
+	int, *http.Header, *stream.StreamResponse[[]byte], error,
 ) {
-	// TODO: implement this function
-	return nil, nil
+	resp, err := genericInvokePlugin[requests.RequestInvokeWebhook, webhook_entities.WebhookResponseChunk](
+		session,
+		request,
+		128,
+		access_types.PLUGIN_ACCESS_TYPE_WEBHOOK,
+		access_types.PLUGIN_ACCESS_ACTION_WEBHOOK,
+	)
+
+	if err != nil {
+		return http.StatusInternalServerError, nil, nil, err
+	}
+
+	status_code := http.StatusContinue
+	headers := &http.Header{}
+	response := stream.NewStreamResponse[[]byte](128)
+	response.OnClose(func() {
+		// add close callback, ensure resources are released
+		resp.Close()
+	})
+
+	for resp.Next() {
+		result, err := resp.Read()
+		if err != nil {
+			resp.Close()
+			return http.StatusInternalServerError, nil, nil, err
+		}
+
+		if result.Status != nil {
+			status_code = int(*result.Status)
+		}
+
+		if result.Headers != nil {
+			for k, v := range result.Headers {
+				headers.Add(k, v)
+			}
+		}
+
+		if result.Result != nil {
+			dehexed, err := hex.DecodeString(*result.Result)
+			if err != nil {
+				resp.Close()
+				return http.StatusInternalServerError, nil, nil, err
+			}
+			response.Write(dehexed)
+			routine.Submit(func() {
+				for resp.Next() {
+					chunk, err := resp.Read()
+					if err != nil {
+						return
+					}
+
+					dehexed, err := hex.DecodeString(*chunk.Result)
+					if err != nil {
+						return
+					}
+					response.Write(dehexed)
+				}
+			})
+			break
+		}
+	}
+
+	return status_code, headers, response, nil
 }

+ 50 - 1
internal/service/webhook.go

@@ -2,11 +2,17 @@ package service
 
 import (
 	"bytes"
+	"encoding/hex"
+	"sync/atomic"
+	"time"
 
 	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 func Webhook(ctx *gin.Context, webhook *models.Webhook, path string) {
@@ -32,5 +38,48 @@ func Webhook(ctx *gin.Context, webhook *models.Webhook, path string) {
 
 	session.BindRuntime(runtime)
 
-	// TODO: handle webhook
+	status_code, headers, response, err := plugin_daemon.InvokeWebhook(session, &requests.RequestInvokeWebhook{
+		RawHttpRequest: hex.EncodeToString(buffer.Bytes()),
+	})
+	if err != nil {
+		ctx.JSON(500, gin.H{"error": err.Error()})
+		return
+	}
+	defer response.Close()
+
+	done := make(chan bool)
+	closed := new(int32)
+
+	ctx.Status(status_code)
+	for k, v := range *headers {
+		if len(v) > 0 {
+			ctx.Writer.Header().Set(k, v[0])
+		}
+	}
+
+	close := func() {
+		if atomic.CompareAndSwapInt32(closed, 0, 1) {
+			close(done)
+		}
+	}
+	defer close()
+
+	routine.Submit(func() {
+		defer close()
+		for response.Next() {
+			chunk, err := response.Read()
+			if err != nil {
+				ctx.JSON(500, gin.H{"error": err.Error()})
+				return
+			}
+			ctx.Writer.Write(chunk)
+		}
+	})
+
+	select {
+	case <-ctx.Writer.CloseNotify():
+	case <-done:
+	case <-time.After(30 * time.Second):
+		ctx.JSON(500, gin.H{"error": "killed by timeout"})
+	}
 }

+ 1 - 0
internal/types/entities/requests/webhook.go

@@ -1,4 +1,5 @@
 package requests
 
 type RequestInvokeWebhook struct {
+	RawHttpRequest string `json:"raw_http_request" validate:"required"`
 }

+ 7 - 0
internal/types/entities/webhook_entities/webhook.go

@@ -0,0 +1,7 @@
+package webhook_entities
+
+type WebhookResponseChunk struct {
+	Status  *uint16           `json:"status" validate:"omitempty"`
+	Headers map[string]string `json:"headers" validate:"omitempty"`
+	Result  *string           `json:"result" validate:"omitempty"`
+}