Browse Source

feat: read/write timeout

Yeuoly 1 year ago
parent
commit
1ea3fa49f4

+ 14 - 6
internal/core/dify_invocation/http_request.go

@@ -6,17 +6,25 @@ import (
 )
 
 func Request[T any](method string, path string, options ...requests.HttpOptions) (*T, error) {
-	options = append(options, requests.HttpHeader(map[string]string{
-		"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
-	}))
+	options = append(options,
+		requests.HttpHeader(map[string]string{
+			"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
+		}),
+		requests.HttpWriteTimeout(5000),
+		requests.HttpReadTimeout(60000),
+	)
 
 	return requests.RequestAndParse[T](client, difyPath(path), method, options...)
 }
 
 func StreamResponse[T any](method string, path string, options ...requests.HttpOptions) (*stream.StreamResponse[T], error) {
-	options = append(options, requests.HttpHeader(map[string]string{
-		"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
-	}))
+	options = append(
+		options, requests.HttpHeader(map[string]string{
+			"X-Inner-Api-Key": PLUGIN_INNER_API_KEY,
+		}),
+		requests.HttpWriteTimeout(5000),
+		requests.HttpReadTimeout(60000),
+	)
 
 	return requests.RequestAndParseStream[T](client, difyPath(path), method, options...)
 }

+ 2 - 0
internal/core/plugin_daemon/invoke_dify.go

@@ -125,6 +125,7 @@ func submitModelTask(
 			log.Error("invoke model failed: %s", err.Error())
 			return
 		}
+		defer response.Close()
 
 		for response.Next() {
 			chunk, _ := response.Read()
@@ -146,6 +147,7 @@ func submitToolTask(
 			log.Error("invoke tool failed: %s", err.Error())
 			return
 		}
+		defer response.Close()
 
 		for response.Next() {
 			chunk, _ := response.Read()

+ 7 - 2
internal/utils/requests/http_options.go

@@ -6,8 +6,13 @@ type HttpOptions struct {
 }
 
 // milliseconds
-func HttpTimeout(timeout int64) HttpOptions {
-	return HttpOptions{"timeout", timeout}
+func HttpWriteTimeout(timeout int64) HttpOptions {
+	return HttpOptions{"write_timeout", timeout}
+}
+
+// milliseconds
+func HttpReadTimeout(timeout int64) HttpOptions {
+	return HttpOptions{"read_timeout", timeout}
 }
 
 func HttpHeader(header map[string]string) HttpOptions {

+ 1 - 1
internal/utils/requests/http_request.go

@@ -18,7 +18,7 @@ func buildHttpRequest(method string, url string, options ...HttpOptions) (*http.
 
 	for _, option := range options {
 		switch option.Type {
-		case "timeout":
+		case "write_timeout":
 			timeout := time.Second * time.Duration(option.Value.(int64))
 			ctx, cancel := context.WithTimeout(context.Background(), timeout)
 			defer cancel()

+ 27 - 0
internal/utils/requests/http_warpper.go

@@ -5,6 +5,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"net/http"
+	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
@@ -25,6 +26,19 @@ func RequestAndParse[T any](client *http.Client, url string, method string, opti
 		return nil, err
 	}
 
+	// get read timeout
+	read_timeout := int64(60000)
+	for _, option := range options {
+		if option.Type == "read_timeout" {
+			read_timeout = option.Value.(int64)
+			break
+		}
+	}
+	time.AfterFunc(time.Millisecond*time.Duration(read_timeout), func() {
+		// close the response body if timeout
+		resp.Body.Close()
+	})
+
 	err = parseJsonBody(resp, &ret)
 	if err != nil {
 		return nil, err
@@ -61,6 +75,19 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
 
 	ch := stream.NewStreamResponse[T](1024)
 
+	// get read timeout
+	read_timeout := int64(60000)
+	for _, option := range options {
+		if option.Type == "read_timeout" {
+			read_timeout = option.Value.(int64)
+			break
+		}
+	}
+	time.AfterFunc(time.Millisecond*time.Duration(read_timeout), func() {
+		// close the response body if timeout
+		resp.Body.Close()
+	})
+
 	routine.Submit(func() {
 		scanner := bufio.NewScanner(resp.Body)
 		for scanner.Scan() {