Selaa lähdekoodia

fix: backwards invocation

Yeuoly 1 vuosi sitten
vanhempi
commit
4df1d043ac

+ 2 - 2
internal/core/plugin_daemon/backwards_invocation/request.go

@@ -40,14 +40,14 @@ func (bi *BackwardsInvocation) WriteError(err error) {
 	)
 }
 
-func (bi *BackwardsInvocation) Write(message string, data any) {
+func (bi *BackwardsInvocation) WriteResponse(message string, data any) {
 	bi.session.Write(
 		session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
 		NewResponseEvent(bi.id, message, parser.StructToMap(data)),
 	)
 }
 
-func (bi *BackwardsInvocation) End() {
+func (bi *BackwardsInvocation) EndResponse() {
 	bi.session.Write(
 		session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
 		NewEndEvent(bi.id),

+ 3 - 3
internal/core/plugin_daemon/invoke_dify.go

@@ -32,10 +32,10 @@ func invokeDify(
 	if err != nil {
 		return err
 	}
-	defer request_handle.End()
 
 	if invoke_from == PLUGIN_ACCESS_TYPE_MODEL {
 		request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
+		request_handle.EndResponse()
 		return nil
 	}
 
@@ -100,11 +100,11 @@ func dispatchDifyInvocationTask(handle *backwards_invocation.BackwardsInvocation
 }
 
 func executeDifyInvocationToolTask(handle *backwards_invocation.BackwardsInvocation, request *dify_invocation.InvokeToolRequest) {
-	handle.Write("stream", tool_entities.ToolResponseChunk{
+	handle.WriteResponse("stream", tool_entities.ToolResponseChunk{
 		Type: "text",
 		Message: map[string]any{
 			"text": "hello world",
 		},
 	})
-	handle.End()
+	handle.EndResponse()
 }

+ 4 - 1
internal/core/plugin_daemon/model_service.go

@@ -46,7 +46,10 @@ func genericInvokePlugin[Req any, Rsp any](
 			}
 			response.Write(chunk)
 		case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
-			invokeDify(runtime, typ, session, chunk.Data)
+			if err := invokeDify(runtime, typ, session, chunk.Data); err != nil {
+				log.Error("invoke dify failed: %s", err.Error())
+				return
+			}
 		case plugin_entities.SESSION_MESSAGE_TYPE_END:
 			response.Close()
 		case plugin_entities.SESSION_MESSAGE_TYPE_ERROR:

+ 8 - 3
internal/core/plugin_manager/remote_manager/connection_key.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/google/uuid"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/redis/go-redis/v9"
 )
@@ -83,10 +84,14 @@ func GetConnectionKey(info ConnectionInfo) (string, error) {
 		if err != nil {
 			return "", err
 		}
-	}
-
-	if err != nil {
+	} else if err != nil {
 		return "", err
+	} else {
+		// update expire time
+		_, err = cache.Expire(strings.Join([]string{CONNECTION_KEY_MANAGER_ID2KEY_PREFIX, info.TenantId}, ":"), CONNECTION_KEY_EXPIRE_TIME)
+		if err != nil {
+			log.Error("failed to update connection key expire time: %s", err.Error())
+		}
 	}
 
 	return key.Key, nil

+ 1 - 0
internal/core/plugin_manager/remote_manager/hooks.go

@@ -18,6 +18,7 @@ type DifyServer struct {
 
 	// listening address
 	addr string
+	port uint16
 
 	// enabled multicore
 	multicore bool

+ 24 - 1
internal/core/plugin_manager/remote_manager/server.go

@@ -1,12 +1,18 @@
 package remote_manager
 
 import (
+	"bufio"
 	"context"
 	"errors"
 	"fmt"
+	"net"
+	"os"
+	"os/exec"
+	"strings"
 	"sync"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/panjf2000/gnet/v2"
 
@@ -52,7 +58,23 @@ func (r *RemotePluginServer) Stop() error {
 
 // Launch starts the server
 func (r *RemotePluginServer) Launch() error {
-	err := gnet.Run(
+	// kill the process if port is already in use
+	listener, err := net.Listen("tcp", fmt.Sprintf(":%d", r.server.port))
+	if err != nil && strings.Contains(err.Error(), "address already in use") {
+		scanner := bufio.NewScanner(os.Stdin)
+		log.Info("Port is already in use, do you want to kill the process using the port? (y/n)")
+		for scanner.Scan() {
+			if scanner.Text() == "y" {
+				exec.Command("fuser", "-k", "tcp", fmt.Sprintf("%d", r.server.port)).Run()
+			} else if scanner.Text() == "n" {
+				return errors.New("port is already in use")
+			}
+		}
+	} else {
+		listener.Close()
+	}
+
+	err = gnet.Run(
 		r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
 		gnet.WithNumEventLoop(r.server.num_loops),
 	)
@@ -79,6 +101,7 @@ func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
 	multicore := true
 	s := &DifyServer{
 		addr:      addr,
+		port:      config.PluginRemoteInstallingPort,
 		multicore: multicore,
 		num_loops: config.PluginRemoteInstallServerEventLoopNums,
 		response:  response,

+ 8 - 0
internal/utils/cache/redis.go

@@ -264,6 +264,14 @@ func Unlock(key string, context ...redis.Cmdable) error {
 	return getCmdable(context...).Del(ctx, serialKey(key)).Err()
 }
 
+func Expire(key string, time time.Duration, context ...redis.Cmdable) (bool, error) {
+	if client == nil {
+		return false, ErrDBNotInit
+	}
+
+	return getCmdable(context...).Expire(ctx, serialKey(key), time).Result()
+}
+
 func Transaction(fn func(redis.Pipeliner) error) error {
 	if client == nil {
 		return ErrDBNotInit