Ver código fonte

feat: add response max bytes

Yeuoly 1 ano atrás
pai
commit
1ea5ab59ff

+ 1 - 0
internal/core/plugin_manager/aws_manager/full_duplex_simulator.go

@@ -496,6 +496,7 @@ func (s *FullDuplexSimulator) receivingConnectionRoutine(routine_id string) {
 		req.Header.Set("Connection", "keep-alive")
 		req.Header.Set("x-dify-plugin-request-id", s.request_id)
 		req.Header.Set("x-dify-plugin-max-alive-time", fmt.Sprintf("%d", s.target_sending_connection_max_alive_time.Milliseconds()))
+		req.Header.Set("x-dify-plugin-max-sending-bytes", fmt.Sprintf("%d", s.max_receiving_bytes))
 
 		ctx, cancel := context.WithCancel(context.Background())
 		req = req.Clone(ctx)

+ 20 - 6
internal/core/plugin_manager/aws_manager/full_duplex_simulator_test.go

@@ -73,7 +73,6 @@ func server() (*S, error) {
 		atomic.AddInt32(&s.send_request, 1)
 		defer atomic.AddInt32(&s.send_request, -1)
 
-		// fmt.Println("new send request")
 		id := c.Request.Header.Get("x-dify-plugin-request-id")
 		max_alive_time := c.Request.Header.Get("x-dify-plugin-max-alive-time")
 		s.current_send_request_id = id
@@ -125,6 +124,14 @@ func server() (*S, error) {
 		// fmt.Println("new recv request")
 		id := ctx.Request.Header.Get("x-dify-plugin-request-id")
 		max_alive_time := ctx.Request.Header.Get("x-dify-plugin-max-alive-time")
+		max_sending_bytes := ctx.Request.Header.Get("x-dify-plugin-max-sending-bytes")
+		max_sending_bytes_int, err := strconv.ParseInt(max_sending_bytes, 10, 64)
+		if err != nil {
+			max_sending_bytes_int = 1024 * 1024
+		}
+
+		sent_bytes := int32(0)
+
 		s.current_recv_request_id = id
 
 		var ch chan []byte
@@ -154,6 +161,13 @@ func server() (*S, error) {
 		for {
 			select {
 			case data := <-ch:
+				if sent_bytes+int32(len(data)) > int32(max_sending_bytes_int) {
+					ctx.Writer.Write(data)
+					ctx.Writer.Flush()
+					ctx.Status(http.StatusOK)
+					return
+				}
+
 				ctx.Writer.Write(data)
 				ctx.Writer.Flush()
 				atomic.AddInt32(&s.send_count, int32(len(data)))
@@ -461,10 +475,10 @@ func TestFullDuplexSimulator_SendLargeData(t *testing.T) {
 
 	simulator, err := NewFullDuplexSimulator(
 		srv.url, &FullDuplexSimulatorOption{
-			SendingConnectionMaxAliveTime:         time.Millisecond * 700,
-			TargetSendingConnectionMaxAliveTime:   time.Millisecond * 700,
-			ReceivingConnectionMaxAliveTime:       time.Millisecond * 1000,
-			TargetReceivingConnectionMaxAliveTime: time.Millisecond * 1000,
+			SendingConnectionMaxAliveTime:         time.Millisecond * 70000,
+			TargetSendingConnectionMaxAliveTime:   time.Millisecond * 70000,
+			ReceivingConnectionMaxAliveTime:       time.Millisecond * 100000,
+			TargetReceivingConnectionMaxAliveTime: time.Millisecond * 100000,
 			MaxSingleRequestSendingBytes:          5 * 1024 * 1024,
 			MaxSingleRequestReceivingBytes:        5 * 1024 * 1024,
 		},
@@ -490,7 +504,7 @@ func TestFullDuplexSimulator_SendLargeData(t *testing.T) {
 		}
 	}
 
-	time.Sleep(time.Second * 1)
+	time.Sleep(time.Second * 5)
 
 	if l != 300*1024*1024 { // 300MB
 		t.Fatal(fmt.Sprintf("expected: %d, actual: %d", 300*1024*1024, l))