소스 검색

fix: backwards invocation can not unmarshal nil map

Yeuoly 11 달 전
부모
커밋
ccf1b6e1d2
2개의 변경된 파일71개의 추가작업 그리고 1개의 파일을 삭제
  1. 13 1
      internal/core/dify_invocation/http_request.go
  2. 58 0
      internal/utils/http_requests/http_warpper.go

+ 13 - 1
internal/core/dify_invocation/http_request.go

@@ -31,6 +31,18 @@ func StreamResponse[T any](method string, path string, options ...http_requests.
 	return http_requests.RequestAndParseStream[T](client, difyPath(path), method, options...)
 }
 
+func StreamResponseMap(method string, path string, options ...http_requests.HttpOptions) (*stream.StreamResponse[map[string]any], error) {
+	options = append(
+		options, http_requests.HttpHeader(map[string]string{
+			"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
+		}),
+		http_requests.HttpWriteTimeout(5000),
+		http_requests.HttpReadTimeout(60000),
+	)
+
+	return http_requests.RequestAndParseStreamMap(client, difyPath(path), method, options...)
+}
+
 func InvokeLLM(payload *InvokeLLMRequest) (*stream.StreamResponse[model_entities.LLMResultChunk], error) {
 	return StreamResponse[model_entities.LLMResultChunk]("POST", "invoke/llm", http_requests.HttpPayloadJson(payload))
 }
@@ -60,5 +72,5 @@ func InvokeTool(payload *InvokeToolRequest) (*stream.StreamResponse[tool_entitie
 }
 
 func InvokeApp(payload *InvokeAppRequest) (*stream.StreamResponse[map[string]any], error) {
-	return StreamResponse[map[string]any]("POST", "invoke/app", http_requests.HttpPayloadJson(payload))
+	return StreamResponseMap("POST", "invoke/app", http_requests.HttpPayloadJson(payload))
 }

+ 58 - 0
internal/utils/http_requests/http_warpper.go

@@ -126,6 +126,64 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
 	return ch, nil
 }
 
+// TODO: improve this, deduplicate code
+func RequestAndParseStreamMap(client *http.Client, url string, method string, options ...HttpOptions) (*stream.StreamResponse[map[string]any], error) {
+	resp, err := Request(client, url, method, options...)
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		defer resp.Body.Close()
+		error_text, _ := io.ReadAll(resp.Body)
+		return nil, fmt.Errorf("request failed with status code: %d and respond with: %s", resp.StatusCode, error_text)
+	}
+
+	ch := stream.NewStreamResponse[map[string]any](1024)
+
+	// get read timeout
+	read_timeout := int64(60000)
+	for _, option := range options {
+		if option.Type == "read_timeout" {
+			read_timeout = option.Value.(int64)
+			break
+		}
+	}
+	time.AfterFunc(time.Millisecond*time.Duration(read_timeout), func() {
+		// close the response body if timeout
+		resp.Body.Close()
+	})
+
+	routine.Submit(func() {
+		scanner := bufio.NewScanner(resp.Body)
+		defer resp.Body.Close()
+
+		for scanner.Scan() {
+			data := scanner.Bytes()
+			if len(data) == 0 {
+				continue
+			}
+
+			if bytes.HasPrefix(data, []byte("data: ")) {
+				// split
+				data = data[6:]
+			}
+
+			// unmarshal
+			t, err := parser.UnmarshalJsonBytes2Map(data)
+			if err != nil {
+				continue
+			}
+
+			ch.Write(t)
+		}
+
+		ch.Close()
+	})
+
+	return ch, nil
+}
+
 func GetAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
 	return RequestAndParseStream[T](client, url, "GET", options...)
 }