Yeuoly пре 10 месеци
родитељ
комит
9071e9c5b0

+ 7 - 1
internal/core/plugin_manager/install.go

@@ -1,6 +1,8 @@
 package plugin_manager
 
 import (
+	"fmt"
+
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/db"
@@ -48,7 +50,9 @@ func (p *PluginManager) InstallToAWSFromPkg(tenant_id string, decoder decoder.Pl
 
 	new_response := stream.NewStream[PluginInstallResponse](128)
 	routine.Submit(func() {
-		defer new_response.Close()
+		defer func() {
+			new_response.Close()
+		}()
 
 		lambda_url := ""
 		lambda_function_name := ""
@@ -125,6 +129,8 @@ func (p *PluginManager) InstallToAWSFromPkg(tenant_id string, decoder decoder.Pl
 				lambda_url = r.Message
 			} else if r.Event == serverless.Lambda {
 				lambda_function_name = r.Message
+			} else {
+				new_response.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
 			}
 		})
 	})

+ 6 - 3
internal/core/plugin_manager/serverless/connector.go

@@ -53,9 +53,11 @@ var (
 // Fetch the lambda function from serverless connector, return error if failed
 func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
 	request := map[string]any{
-		"config": map[string]any{
-			"identity": identity,
-			"checksum": checksum,
+		"plugin": map[string]any{
+			"config": map[string]any{
+				"identity": identity,
+				"checksum": checksum,
+			},
 		},
 	}
 
@@ -127,6 +129,7 @@ func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*str
 				"context": context_tar,
 			},
 		),
+		http_requests.HttpRaiseErrorWhenStreamDataNotMatch(true),
 	)
 
 	if err != nil {

+ 4 - 0
internal/utils/http_requests/http_options.go

@@ -50,6 +50,10 @@ func HttpPayloadMultipart(payload map[string]string, files map[string]io.Reader)
 	}}
 }
 
+func HttpRaiseErrorWhenStreamDataNotMatch(raise bool) HttpOptions {
+	return HttpOptions{"raiseErrorWhenStreamDataNotMatch", raise}
+}
+
 func HttpWithDirectReferer() HttpOptions {
 	return HttpOptions{"directReferer", true}
 }

+ 13 - 0
internal/utils/http_requests/http_warpper.go

@@ -86,14 +86,23 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
 		return nil, fmt.Errorf("request failed with status code: %d and respond with: %s", resp.StatusCode, error_text)
 	}
 
+	if resp.StatusCode != http.StatusOK {
+		defer resp.Body.Close()
+		error_text, _ := io.ReadAll(resp.Body)
+		return nil, fmt.Errorf("request failed with status code: %d and respond with: %s", resp.StatusCode, error_text)
+	}
+
 	ch := stream.NewStream[T](1024)
 
 	// get read timeout
 	read_timeout := int64(60000)
+	raise_error_when_stream_data_not_match := false
 	for _, option := range options {
 		if option.Type == "read_timeout" {
 			read_timeout = option.Value.(int64)
 			break
+		} else if option.Type == "raiseErrorWhenStreamDataNotMatch" {
+			raise_error_when_stream_data_not_match = option.Value.(bool)
 		}
 	}
 	time.AfterFunc(time.Millisecond*time.Duration(read_timeout), func() {
@@ -119,6 +128,10 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
 			// unmarshal
 			t, err := parser.UnmarshalJsonBytes[T](data)
 			if err != nil {
+				if raise_error_when_stream_data_not_match {
+					ch.WriteError(err)
+					break
+				}
 				continue
 			}
 

+ 0 - 4
internal/utils/stream/response.go

@@ -103,10 +103,6 @@ func (r *Stream[T]) Read() (T, error) {
 
 // Async wraps the stream with a new stream, and allows customized operations
 func (r *Stream[T]) Async(fn func(T)) error {
-	if atomic.LoadInt32(&r.closed) == 1 {
-		return errors.New("stream is closed")
-	}
-
 	for r.Next() {
 		data, err := r.Read()
 		if err != nil {