浏览代码

enhancement: support split large blob into small chunks

Yeuoly 9 月之前
父节点
当前提交
d455e29c47

+ 78 - 1
internal/core/plugin_daemon/tool_service.go

@@ -1,12 +1,15 @@
 package plugin_daemon
 
 import (
+	"bytes"
+	"encoding/base64"
 	"errors"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/xeipuuv/gojsonschema"
 )
@@ -46,10 +49,84 @@ func InvokeTool(
 		}
 	}
 
+	new_response := stream.NewStream[tool_entities.ToolResponseChunk](128)
+	routine.Submit(func() {
+		files := make(map[string]*bytes.Buffer)
+		defer new_response.Close()
+
+		for response.Next() {
+			item, err := response.Read()
+			if err != nil {
+				return
+			}
+
+			if item.Type == tool_entities.ToolResponseChunkTypeBlobChunk {
+				id, ok := item.Message["id"].(string)
+				if !ok {
+					continue
+				}
+
+				total_length, ok := item.Message["total_length"].(float64)
+				if !ok {
+					continue
+				}
+
+				// convert total_length to int
+				total_length_int := int(total_length)
+
+				blob, ok := item.Message["blob"].(string)
+				if !ok {
+					continue
+				}
+
+				end, ok := item.Message["end"].(bool)
+				if !ok {
+					continue
+				}
+
+				if _, ok := files[id]; !ok {
+					files[id] = bytes.NewBuffer(make([]byte, 0, total_length_int))
+				}
+
+				if end {
+					new_response.Write(tool_entities.ToolResponseChunk{
+						Type: tool_entities.ToolResponseChunkTypeBlob,
+						Message: map[string]any{
+							"blob": files[id].Bytes(), // bytes will be encoded to base64 finally
+						},
+						Meta: item.Meta,
+					})
+				} else {
+					if files[id].Len() > 15*1024*1024 {
+						// delete the file if it is too large
+						delete(files, id)
+						new_response.WriteError(errors.New("file is too large"))
+						return
+					} else {
+						// decode the blob using base64
+						decoded, err := base64.StdEncoding.DecodeString(blob)
+						if err != nil {
+							new_response.WriteError(err)
+							return
+						}
+						if len(decoded) > 8192 {
+							// single chunk is too large, raises error
+							new_response.WriteError(errors.New("single file chunk is too large"))
+							return
+						}
+						files[id].Write(decoded)
+					}
+				}
+			} else {
+				new_response.Write(item)
+			}
+		}
+	})
+
 	// bind json schema validator
 	bindValidator(response, tool_output_schema)
 
-	return response, nil
+	return new_response, nil
 }
 
 func bindValidator(

+ 1 - 1
internal/core/plugin_manager/remote_manager/connection_key.go

@@ -41,7 +41,7 @@ const (
 	CONNECTION_KEY_MANAGER_KEY2ID_PREFIX = "remote:key:manager:key2id"
 	CONNECTION_KEY_MANAGER_ID2KEY_PREFIX = "remote:key:manager:id2key"
 	CONNECTION_KEY_LOCK                  = "connection_lock"
-	CONNECTION_KEY_EXPIRE_TIME           = time.Minute * 15
+	CONNECTION_KEY_EXPIRE_TIME           = time.Hour * 15
 )
 
 // returns a random string, create it if not exists

+ 2 - 0
internal/types/entities/tool_entities/tool.go

@@ -12,6 +12,7 @@ const (
 	ToolResponseChunkTypeText      ToolResponseChunkType = "text"
 	ToolResponseChunkTypeFile      ToolResponseChunkType = "file"
 	ToolResponseChunkTypeBlob      ToolResponseChunkType = "blob"
+	ToolResponseChunkTypeBlobChunk ToolResponseChunkType = "blob_chunk"
 	ToolResponseChunkTypeJson      ToolResponseChunkType = "json"
 	ToolResponseChunkTypeLink      ToolResponseChunkType = "link"
 	ToolResponseChunkTypeImage     ToolResponseChunkType = "image"
@@ -25,6 +26,7 @@ func IsValidToolResponseChunkType(fl validator.FieldLevel) bool {
 	case ToolResponseChunkTypeText,
 		ToolResponseChunkTypeFile,
 		ToolResponseChunkTypeBlob,
+		ToolResponseChunkTypeBlobChunk,
 		ToolResponseChunkTypeJson,
 		ToolResponseChunkTypeLink,
 		ToolResponseChunkTypeImage,