Forráskód Böngészése

fix: backwards http requests

Yeuoly 1 éve%!(EXTRA string=óta)
szülő
commit
b1755f3fea

+ 6 - 20
internal/core/plugin_manager/remote_manager/server.go

@@ -1,18 +1,14 @@
 package remote_manager
 
 import (
-	"bufio"
 	"context"
 	"errors"
 	"fmt"
-	"net"
-	"os"
 	"os/exec"
-	"strings"
 	"sync"
+	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/panjf2000/gnet/v2"
 
@@ -64,22 +60,12 @@ func (r *RemotePluginServer) Stop() error {
 // Launch starts the server
 func (r *RemotePluginServer) Launch() error {
 	// kill the process if port is already in use
-	listener, err := net.Listen("tcp", fmt.Sprintf(":%d", r.server.port))
-	if err != nil && strings.Contains(err.Error(), "address already in use") {
-		scanner := bufio.NewScanner(os.Stdin)
-		log.Info("Port is already in use, do you want to kill the process using the port? (y/n)")
-		for scanner.Scan() {
-			if scanner.Text() == "y" {
-				exec.Command("fuser", "-k", "tcp", fmt.Sprintf("%d", r.server.port)).Run()
-			} else if scanner.Text() == "n" {
-				return errors.New("port is already in use")
-			}
-		}
-	} else {
-		listener.Close()
-	}
+	// TODO: switch to optional
+	exec.Command("fuser", "-k", "tcp", fmt.Sprintf("%d", r.server.port)).Run()
+
+	time.Sleep(time.Millisecond * 100)
 
-	err = gnet.Run(
+	err := gnet.Run(
 		r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
 		gnet.WithNumEventLoop(r.server.num_loops),
 	)

+ 1 - 1
internal/utils/http_requests/http_request.go

@@ -21,7 +21,7 @@ func buildHttpRequest(method string, url string, options ...HttpOptions) (*http.
 		case "write_timeout":
 			timeout := time.Second * time.Duration(option.Value.(int64))
 			ctx, cancel := context.WithTimeout(context.Background(), timeout)
-			defer cancel()
+			time.AfterFunc(timeout, cancel) // release resources associated with context asynchronously
 			req = req.WithContext(ctx)
 		case "header":
 			for k, v := range option.Value.(map[string]string) {

+ 11 - 6
internal/utils/http_requests/http_warpper.go

@@ -92,17 +92,22 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
 		scanner := bufio.NewScanner(resp.Body)
 		for scanner.Scan() {
 			data := scanner.Bytes()
+			if len(data) == 0 {
+				continue
+			}
+
 			if bytes.HasPrefix(data, []byte("data: ")) {
 				// split
 				data = data[6:]
-				// unmarshal
-				t, err := parser.UnmarshalJsonBytes[T](data)
-				if err != nil {
-					continue
-				}
+			}
 
-				ch.Write(t)
+			// unmarshal
+			t, err := parser.UnmarshalJsonBytes[T](data)
+			if err != nil {
+				continue
 			}
+
+			ch.Write(t)
 		}
 
 		ch.Close()