Просмотр исходного кода

Merge branch 'langgenius:main' into main

zhangyuhang месяцев назад: 4
Родитель
Сommit
7bcdcd8307

+ 7 - 0
.github/workflows/build-push.yml

@@ -5,6 +5,9 @@ on:
     branches:
       - "main"
       - "deploy/dev"
+  pull_request:
+    branches:
+      - "main"
   release:
     types: [published]
 
@@ -52,6 +55,7 @@ jobs:
 
       - name: Login to Docker Hub
         uses: docker/login-action@v2
+        if: github.event_name != 'pull_request'
         with:
           username: ${{ env.DOCKERHUB_USER }}
           password: ${{ env.DOCKERHUB_TOKEN }}
@@ -71,12 +75,14 @@ jobs:
         run: docker build --build-arg PLATFORM=${{ matrix.scope }} --build-arg VERSION=${{ github.sha }} -t dify-plugin-daemon -f ./docker/${{ matrix.scope }}.dockerfile .
 
       - name: Tag Docker Images
+        if: github.event_name != 'pull_request'
         run:
           for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n');
           do
             docker tag dify-plugin-daemon "$tag-${{ matrix.scope }}-${{ env.PLATFORM_PAIR }}";
           done
       - name: Push Docker Image
+        if: github.event_name != 'pull_request'
         run:
           for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n');
           do
@@ -84,6 +90,7 @@ jobs:
           done
 
   create-manifest:
+    if: github.event_name != 'pull_request'
     needs: build
     runs-on: ubuntu-latest
     strategy:

+ 1 - 1
cmd/commandline/plugin/templates/python/requirements.txt

@@ -1 +1 @@
-dify_plugin~=0.0.1b67
+dify_plugin~=0.0.1b72

+ 10 - 1
docker/local.dockerfile

@@ -33,11 +33,20 @@ WORKDIR /app
 ARG PLATFORM=local
 
 # Install python3.12 if PLATFORM is local
-RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y python3.12 python3.12-venv python3.12-dev ffmpeg \
+RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y curl python3.12 python3.12-venv python3.12-dev python3-pip ffmpeg build-essential \
     && apt-get clean \
     && rm -rf /var/lib/apt/lists/* \
     && update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.12 1;
 
+# Install uv
+RUN mv /usr/lib/python3.12/EXTERNALLY-MANAGED /usr/lib/python3.12/EXTERNALLY-MANAGED.bk && python3 -m pip install uv
+
+# Install dify_plugin to speedup the environment setup
+RUN uv pip install --system dify_plugin
+
+# Test uv
+RUN python3 -c "from uv._find_uv import find_uv_bin;print(find_uv_bin())"
+
 ENV PLATFORM=$PLATFORM
 ENV GIN_MODE=release
 

+ 7 - 6
internal/core/persistence/persistence_test.go

@@ -19,12 +19,13 @@ func TestPersistenceStoreAndLoad(t *testing.T) {
 	defer cache.Close()
 
 	db.Init(&app.Config{
-		DBUsername: "postgres",
-		DBPassword: "difyai123456",
-		DBHost:     "localhost",
-		DBPort:     5432,
-		DBDatabase: "dify_plugin_daemon",
-		DBSslMode:  "disable",
+		DBUsername:        "postgres",
+		DBPassword:        "difyai123456",
+		DBHost:            "localhost",
+		DBDefaultDatabase: "postgres",
+		DBPort:            5432,
+		DBDatabase:        "dify_plugin_daemon",
+		DBSslMode:         "disable",
 	})
 	defer db.Close()
 

+ 1 - 1
internal/core/plugin_daemon/model_service.go

@@ -55,7 +55,7 @@ func InvokeTTS(
 	return GenericInvokePlugin[requests.RequestInvokeTTS, model_entities.TTSResult](
 		session,
 		request,
-		1,
+		512,
 	)
 }
 

+ 25 - 23
internal/core/plugin_manager/local_runtime/environment_python.go

@@ -51,7 +51,17 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 	// execute init command, create a virtual environment
 	success := false
 
-	cmd := exec.Command("bash", "-c", fmt.Sprintf("%s -m venv .venv", p.defaultPythonInterpreterPath))
+	// using `from uv._find_uv import find_uv_bin; print(find_uv_bin())` to find uv path
+	cmd := exec.Command(p.defaultPythonInterpreterPath, "-c", "from uv._find_uv import find_uv_bin; print(find_uv_bin())")
+	cmd.Dir = p.State.WorkingPath
+	output, err := cmd.Output()
+	if err != nil {
+		return fmt.Errorf("failed to find uv path: %s", err)
+	}
+
+	uvPath := strings.TrimSpace(string(output))
+
+	cmd = exec.Command(uvPath, "venv", ".venv")
 	cmd.Dir = p.State.WorkingPath
 	b := bytes.NewBuffer(nil)
 	cmd.Stdout = b
@@ -71,21 +81,11 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 		}
 	}()
 
-	// try find python interpreter and pip
-	pipPath, err := filepath.Abs(path.Join(p.State.WorkingPath, ".venv/bin/pip"))
-	if err != nil {
-		return fmt.Errorf("failed to find pip: %s", err)
-	}
-
 	pythonPath, err := filepath.Abs(path.Join(p.State.WorkingPath, ".venv/bin/python"))
 	if err != nil {
 		return fmt.Errorf("failed to find python: %s", err)
 	}
 
-	if _, err := os.Stat(pipPath); err != nil {
-		return fmt.Errorf("failed to find pip: %s", err)
-	}
-
 	if _, err := os.Stat(pythonPath); err != nil {
 		return fmt.Errorf("failed to find python: %s", err)
 	}
@@ -102,13 +102,7 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
 	defer cancel()
 
-	args := []string{"install", "--disable-pip-version-check"} // FIX: pip version check takes too long
-
-	if p.HttpProxy != "" {
-		args = append(args, "--proxy", p.HttpProxy)
-	} else if p.HttpsProxy != "" {
-		args = append(args, "--proxy", p.HttpsProxy)
-	}
+	args := []string{"install"}
 
 	if p.pipMirrorUrl != "" {
 		args = append(args, "-i", p.pipMirrorUrl)
@@ -116,10 +110,6 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 
 	args = append(args, "-r", "requirements.txt")
 
-	if p.pipPreferBinary {
-		args = append(args, "--prefer-binary")
-	}
-
 	if p.pipVerbose {
 		args = append(args, "-vvv")
 	}
@@ -128,7 +118,11 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 		args = append(args, strings.Split(p.pipExtraArgs, " ")...)
 	}
 
-	cmd = exec.CommandContext(ctx, pipPath, args...)
+	args = append([]string{"pip"}, args...)
+
+	virtualEnvPath := path.Join(p.State.WorkingPath, ".venv")
+	cmd = exec.CommandContext(ctx, uvPath, args...)
+	cmd.Env = append(cmd.Env, "VIRTUAL_ENV="+virtualEnvPath, "PATH="+os.Getenv("PATH"))
 	cmd.Dir = p.State.WorkingPath
 
 	// get stdout and stderr
@@ -307,6 +301,14 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 		return fmt.Errorf("failed to pre-compile the plugin: %s", compileErrMsg.String())
 	}
 
+	log.Info("pre-loading the plugin %s", p.Config.Identity())
+
+	// import dify_plugin to speedup the first launching
+	// ISSUE: it takes too long to setup all the deps, that's why we choose to preload it
+	importCmd := exec.CommandContext(ctx, pythonPath, "-c", "import dify_plugin")
+	importCmd.Dir = p.State.WorkingPath
+	importCmd.Output()
+
 	// PATCH:
 	//  plugin sdk version less than 0.0.1b70 contains a memory leak bug
 	//  to reach a better user experience, we will patch it here using a patched file

+ 12 - 3
internal/core/plugin_manager/local_runtime/stdio_handle.go

@@ -100,13 +100,22 @@ func (s *stdioHolder) StartStdout(notify_heartbeat func()) {
 				for _, listener := range listeners {
 					listener(s.id, data)
 				}
+				// FIX: avoid deadlock to plugin invoke
 				s.l.Lock()
-				defer s.l.Unlock()
+				tasks := []func(){}
 				for listener_session_id, listener := range s.listener {
+					// copy the listener to avoid reference issue
+					listener := listener
 					if listener_session_id == session_id {
-						listener(data)
+						tasks = append(tasks, func() {
+							listener(data)
+						})
 					}
 				}
+				s.l.Unlock()
+				for _, t := range tasks {
+					t()
+				}
 			},
 			func() {
 				// notify launched
@@ -196,7 +205,7 @@ func (s *stdioHolder) Wait() error {
 			}
 			if time.Since(s.lastActiveAt) > 60*time.Second {
 				log.Warn(
-					"plugin %s is not active for %d seconds, it may be dead",
+					"plugin %s is not active for %f seconds, it may be dead",
 					s.pluginUniqueIdentifier,
 					time.Since(s.lastActiveAt).Seconds(),
 				)

+ 0 - 9
internal/core/plugin_manager/media_transport/installed_bucket.go

@@ -48,15 +48,6 @@ func (b *InstalledBucket) Get(
 
 // List lists all the plugins in the installed bucket
 func (b *InstalledBucket) List() ([]plugin_entities.PluginUniqueIdentifier, error) {
-	// check if the patch exists
-	exists, err := b.oss.Exists(b.installedPath)
-	if err != nil {
-		return nil, err
-	}
-	if !exists {
-		return []plugin_entities.PluginUniqueIdentifier{}, nil
-	}
-
 	paths, err := b.oss.List(b.installedPath)
 	if err != nil {
 		return nil, err

+ 1 - 1
internal/db/init.go

@@ -134,7 +134,7 @@ func Init(config *app.Config) {
 		config.DBHost,
 		int(config.DBPort),
 		config.DBDatabase,
-		"postgres",
+		config.DBDefaultDatabase,
 		config.DBUsername,
 		config.DBPassword,
 		config.DBSslMode,

+ 10 - 2
internal/oss/local/local_storage.go

@@ -57,10 +57,18 @@ func (l *LocalStorage) State(key string) (oss.OSSState, error) {
 }
 
 func (l *LocalStorage) List(prefix string) ([]oss.OSSPath, error) {
-	prefix = filepath.Join(l.root, prefix)
 	paths := make([]oss.OSSPath, 0)
+	// check if the patch exists
+	exists, err := l.Exists(prefix)
+	if err != nil {
+		return nil, err
+	}
+	if !exists {
+		return paths, nil
+	}
+	prefix = filepath.Join(l.root, prefix)
 
-	err := filepath.WalkDir(prefix, func(path string, d fs.DirEntry, err error) error {
+	err = filepath.WalkDir(prefix, func(path string, d fs.DirEntry, err error) error {
 		if err != nil {
 			return err
 		}

+ 8 - 6
internal/server/endpoint.go

@@ -2,10 +2,12 @@ package server
 
 import (
 	"errors"
+	"time"
 
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/db"
 	"github.com/langgenius/dify-plugin-daemon/internal/service"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/exception"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -17,22 +19,22 @@ import (
 // - Yeuoly
 
 // EndpointHandler is a function type that can be used to handle endpoint requests
-type EndpointHandler func(ctx *gin.Context, hookId string, path string)
+type EndpointHandler func(ctx *gin.Context, hookId string, maxExecutionTime time.Duration, path string)
 
-func (app *App) Endpoint() func(c *gin.Context) {
+func (app *App) Endpoint(config *app.Config) func(c *gin.Context) {
 	return func(c *gin.Context) {
 		hookId := c.Param("hook_id")
 		path := c.Param("path")
 
 		if app.endpointHandler != nil {
-			app.endpointHandler(c, hookId, path)
+			app.endpointHandler(c, hookId, time.Duration(config.PluginMaxExecutionTimeout)*time.Second, path)
 		} else {
-			app.EndpointHandler(c, hookId, path)
+			app.EndpointHandler(c, hookId, time.Duration(config.PluginMaxExecutionTimeout)*time.Second, path)
 		}
 	}
 }
 
-func (app *App) EndpointHandler(ctx *gin.Context, hookId string, path string) {
+func (app *App) EndpointHandler(ctx *gin.Context, hookId string, maxExecutionTime time.Duration, path string) {
 	endpoint, err := db.GetOne[models.Endpoint](
 		db.Equal("hook_id", hookId),
 	)
@@ -71,6 +73,6 @@ func (app *App) EndpointHandler(ctx *gin.Context, hookId string, path string) {
 	if ok, originalError := app.cluster.IsPluginOnCurrentNode(pluginUniqueIdentifier); !ok {
 		app.redirectPluginInvokeByPluginIdentifier(ctx, pluginUniqueIdentifier, originalError)
 	} else {
-		service.Endpoint(ctx, &endpoint, &pluginInstallation, path)
+		service.Endpoint(ctx, &endpoint, &pluginInstallation, maxExecutionTime, path)
 	}
 }

+ 3 - 1
internal/server/endpoint_test.go

@@ -4,6 +4,7 @@ import (
 	"net/http"
 	"strconv"
 	"testing"
+	"time"
 
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
@@ -21,7 +22,7 @@ func TestEndpointParams(t *testing.T) {
 	globalHookId := ""
 	globalHookPath := ""
 
-	handler := func(ctx *gin.Context, hook_id string, path string) {
+	handler := func(ctx *gin.Context, hook_id string, maxExecutionTime time.Duration, path string) {
 		globalHookId = hook_id
 		globalHookPath = path
 	}
@@ -32,6 +33,7 @@ func TestEndpointParams(t *testing.T) {
 	cancel := appPointer.server(&app.Config{
 		ServerPort:            port,
 		PluginEndpointEnabled: parser.ToPtr(true),
+		HealthApiLogEnabled:   parser.ToPtr(true),
 	})
 	defer cancel()
 

+ 15 - 7
internal/server/http_server.go

@@ -18,7 +18,15 @@ import (
 
 // server starts a http server and returns a function to stop it
 func (app *App) server(config *app.Config) func() {
-	engine := gin.Default()
+	engine := gin.New()
+	if *config.HealthApiLogEnabled {
+		engine.Use(gin.Logger())
+	} else {
+		engine.Use(gin.LoggerWithConfig(gin.LoggerConfig{
+			SkipPaths: []string{"/health/check"},
+		}))
+	}
+	engine.Use(gin.Recovery())
 	engine.GET("/health/check", controllers.HealthCheck(config))
 
 	endpointGroup := engine.Group("/e")
@@ -104,12 +112,12 @@ func (app *App) remoteDebuggingGroup(group *gin.RouterGroup, config *app.Config)
 
 func (app *App) endpointGroup(group *gin.RouterGroup, config *app.Config) {
 	if config.PluginEndpointEnabled != nil && *config.PluginEndpointEnabled {
-		group.HEAD("/:hook_id/*path", app.Endpoint())
-		group.POST("/:hook_id/*path", app.Endpoint())
-		group.GET("/:hook_id/*path", app.Endpoint())
-		group.PUT("/:hook_id/*path", app.Endpoint())
-		group.DELETE("/:hook_id/*path", app.Endpoint())
-		group.OPTIONS("/:hook_id/*path", app.Endpoint())
+		group.HEAD("/:hook_id/*path", app.Endpoint(config))
+		group.POST("/:hook_id/*path", app.Endpoint(config))
+		group.GET("/:hook_id/*path", app.Endpoint(config))
+		group.PUT("/:hook_id/*path", app.Endpoint(config))
+		group.DELETE("/:hook_id/*path", app.Endpoint(config))
+		group.OPTIONS("/:hook_id/*path", app.Endpoint(config))
 	}
 }
 

+ 40 - 30
internal/service/endpoint.go

@@ -7,6 +7,7 @@ import (
 	"errors"
 	"fmt"
 	"io"
+	"net/http"
 	"sync/atomic"
 	"time"
 
@@ -27,58 +28,67 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities/requests"
 )
 
-func Endpoint(
-	ctx *gin.Context,
-	endpoint *models.Endpoint,
-	pluginInstallation *models.PluginInstallation,
-	path string,
-) {
-	if !endpoint.Enabled {
-		ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
-		return
-	}
-
-	req := ctx.Request.Clone(context.Background())
+func copyRequest(req *http.Request, hookId string, path string) (*bytes.Buffer, error) {
+	newReq := req.Clone(context.Background())
 	// get query params
 	queryParams := req.URL.Query()
 
 	// replace path with endpoint path
-	req.URL.Path = path
+	newReq.URL.Path = path
 	// set query params
-	req.URL.RawQuery = queryParams.Encode()
+	newReq.URL.RawQuery = queryParams.Encode()
 
 	// read request body until complete, max 10MB
 	body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))
 	if err != nil {
-		ctx.JSON(500, exception.InternalServerError(err).ToResponse())
-		return
+		return nil, err
 	}
 
 	// replace with a new reader
-	req.Body = io.NopCloser(bytes.NewReader(body))
-	req.ContentLength = int64(len(body))
-	req.TransferEncoding = nil
+	newReq.Body = io.NopCloser(bytes.NewReader(body))
+	newReq.ContentLength = int64(len(body))
+	newReq.TransferEncoding = nil
 
 	// remove ip traces for security
-	req.Header.Del("X-Forwarded-For")
-	req.Header.Del("X-Real-IP")
-	req.Header.Del("X-Forwarded")
-	req.Header.Del("X-Original-Forwarded-For")
-	req.Header.Del("X-Original-Url")
-	req.Header.Del("X-Original-Host")
+	newReq.Header.Del("X-Forwarded-For")
+	newReq.Header.Del("X-Real-IP")
+	newReq.Header.Del("X-Forwarded")
+	newReq.Header.Del("X-Original-Forwarded-For")
+	newReq.Header.Del("X-Original-Url")
+	newReq.Header.Del("X-Original-Host")
 
 	// setup hook id to request
-	req.Header.Set("Dify-Hook-Id", endpoint.HookID)
+	newReq.Header.Set("Dify-Hook-Id", hookId)
 	// check if Dify-Hook-Url is set
 	if url := req.Header.Get("Dify-Hook-Url"); url == "" {
-		req.Header.Set(
+		newReq.Header.Set(
 			"Dify-Hook-Url",
-			fmt.Sprintf("http://%s:%s/e/%s%s", req.Host, req.URL.Port(), endpoint.HookID, path),
+			fmt.Sprintf("http://%s/e/%s%s", req.Host, hookId, path),
 		)
 	}
 
 	var buffer bytes.Buffer
-	err = req.Write(&buffer)
+	err = newReq.Write(&buffer)
+	if err != nil {
+		return nil, err
+	}
+
+	return &buffer, nil
+}
+
+func Endpoint(
+	ctx *gin.Context,
+	endpoint *models.Endpoint,
+	pluginInstallation *models.PluginInstallation,
+	maxExecutionTime time.Duration,
+	path string,
+) {
+	if !endpoint.Enabled {
+		ctx.JSON(404, exception.NotFoundError(errors.New("endpoint not found")).ToResponse())
+		return
+	}
+
+	buffer, err := copyRequest(ctx.Request, endpoint.HookID, path)
 	if err != nil {
 		ctx.JSON(500, exception.InternalServerError(err).ToResponse())
 		return
@@ -195,7 +205,7 @@ func Endpoint(
 	select {
 	case <-ctx.Writer.CloseNotify():
 	case <-done:
-	case <-time.After(240 * time.Second):
+	case <-time.After(maxExecutionTime):
 		ctx.JSON(500, exception.InternalServerError(errors.New("killed by timeout")).ToResponse())
 	}
 }

+ 26 - 0
internal/service/endpoint_test.go

@@ -0,0 +1,26 @@
+package service
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"testing"
+)
+
+func TestCopyRequest(t *testing.T) {
+	req, err := http.NewRequest("GET", "http://localhost:8080/test?test=123", nil)
+	req.Body = io.NopCloser(bytes.NewReader([]byte("test")))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	buffer, err := copyRequest(req, "123", "/test")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	str := buffer.String()
+	if str != "GET /test?test=123 HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: Go-http-client/1.1\r\nContent-Length: 4\r\nDify-Hook-Id: 123\r\nDify-Hook-Url: http://localhost:8080/e/123/test\r\n\r\ntest" {
+		t.Fatal("request body is not equal, ", str)
+	}
+}

+ 3 - 0
internal/types/app/config.go

@@ -110,6 +110,9 @@ type Config struct {
 	// proxy settings
 	HttpProxy  string `envconfig:"HTTP_PROXY"`
 	HttpsProxy string `envconfig:"HTTPS_PROXY"`
+
+	// log settings
+	HealthApiLogEnabled *bool `envconfig:"HEALTH_API_LOG_ENABLED"`
 }
 
 func (c *Config) Validate() error {

+ 1 - 0
internal/types/app/default.go

@@ -34,6 +34,7 @@ func (config *Config) SetDefault() {
 	setDefaultBoolPtr(&config.PipPreferBinary, true)
 	setDefaultBoolPtr(&config.PipVerbose, true)
 	setDefaultString(&config.DBDefaultDatabase, "postgres")
+	setDefaultBoolPtr(&config.HealthApiLogEnabled, true)
 }
 
 func setDefaultInt[T constraints.Integer](value *T, defaultValue T) {

+ 4 - 2
internal/utils/cache/redis.go

@@ -7,6 +7,7 @@ import (
 	"strings"
 	"time"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/redis/go-redis/v9"
 )
@@ -483,8 +484,9 @@ func Subscribe[T any](channel string) (<-chan T, func()) {
 		for alive {
 			iface, err := pubsub.Receive(context.Background())
 			if err != nil {
-				alive = false
-				break
+				log.Error("failed to receive message from redis: %s, will retry in 1 second", err.Error())
+				time.Sleep(1 * time.Second)
+				continue
 			}
 			switch data := iface.(type) {
 			case *redis.Subscription:

+ 0 - 6
internal/utils/http_requests/http_warpper.go

@@ -87,12 +87,6 @@ func RequestAndParseStream[T any](client *http.Client, url string, method string
 		return nil, fmt.Errorf("request failed with status code: %d and respond with: %s", resp.StatusCode, errorText)
 	}
 
-	if resp.StatusCode != http.StatusOK {
-		defer resp.Body.Close()
-		errorText, _ := io.ReadAll(resp.Body)
-		return nil, fmt.Errorf("request failed with status code: %d and respond with: %s", resp.StatusCode, errorText)
-	}
-
 	ch := stream.NewStream[T](1024)
 
 	// get read timeout