Yeuoly 10 ヶ月 前
コミット
9cc4a5bef2
共有2 個のファイルを変更した37 個の追加15 個の削除を含む
  1. 1 1
      internal/core/dify_invocation/real/http_request.go
  2. 36 14
      internal/core/plugin_daemon/backwards_invocation/task.go

+ 1 - 1
internal/core/dify_invocation/real/http_request.go

@@ -63,7 +63,7 @@ func StreamResponse[T any](i *RealBackwardsInvocation, method string, path strin
 		response.Close()
 	})
 	routine.Submit(func() {
-		defer response.Close()
+		defer new_response.Close()
 		for response.Next() {
 			t, err := response.Read()
 			if err != nil {

+ 36 - 14
internal/core/plugin_daemon/backwards_invocation/task.go

@@ -8,9 +8,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
@@ -275,9 +273,15 @@ func executeDifyInvocationToolTask(
 		return
 	}
 
-	response.Async(func(t tool_entities.ToolResponseChunk) {
-		handle.WriteResponse("stream", t)
-	})
+	for response.Next() {
+		value, err := response.Read()
+		if err != nil {
+			handle.WriteError(fmt.Errorf("read tool response failed: %s", err.Error()))
+			return
+		}
+
+		handle.WriteResponse("stream", value)
+	}
 }
 
 func executeDifyInvocationLLMTask(
@@ -290,9 +294,15 @@ func executeDifyInvocationLLMTask(
 		return
 	}
 
-	response.Async(func(t model_entities.LLMResultChunk) {
-		handle.WriteResponse("stream", t)
-	})
+	for response.Next() {
+		value, err := response.Read()
+		if err != nil {
+			handle.WriteError(fmt.Errorf("read llm model failed: %s", err.Error()))
+			return
+		}
+
+		handle.WriteResponse("stream", value)
+	}
 }
 
 func executeDifyInvocationTextEmbeddingTask(
@@ -331,9 +341,15 @@ func executeDifyInvocationTTSTask(
 		return
 	}
 
-	response.Async(func(t model_entities.TTSResult) {
-		handle.WriteResponse("struct", t)
-	})
+	for response.Next() {
+		value, err := response.Read()
+		if err != nil {
+			handle.WriteError(fmt.Errorf("read tts model failed: %s", err.Error()))
+			return
+		}
+
+		handle.WriteResponse("stream", value)
+	}
 }
 
 func executeDifyInvocationSpeech2TextTask(
@@ -380,9 +396,15 @@ func executeDifyInvocationAppTask(
 
 	request.User = user_id
 
-	response.Async(func(t map[string]any) {
-		handle.WriteResponse("stream", t)
-	})
+	for response.Next() {
+		value, err := response.Read()
+		if err != nil {
+			handle.WriteError(fmt.Errorf("read app failed: %s", err.Error()))
+			return
+		}
+
+		handle.WriteResponse("stream", value)
+	}
 }
 
 func executeDifyInvocationParameterExtractor(