瀏覽代碼

fix: install progress has bad handing logics

Yeuoly 9 月之前
父節點
當前提交
090f1e004d

+ 11 - 1
internal/core/plugin_manager/manager.go

@@ -1,6 +1,7 @@
 package plugin_manager
 
 import (
+	"errors"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -166,5 +167,14 @@ func (p *PluginManager) SavePackage(plugin_unique_identifier plugin_entities.Plu
 }
 
 func (p *PluginManager) GetPackage(plugin_unique_identifier plugin_entities.PluginUniqueIdentifier) ([]byte, error) {
-	return os.ReadFile(filepath.Join(p.packageCachePath, plugin_unique_identifier.String()))
+	file, err := os.ReadFile(filepath.Join(p.packageCachePath, plugin_unique_identifier.String()))
+
+	if err != nil {
+		if os.IsNotExist(err) {
+			return nil, errors.New("plugin package not found, please upload it firstly")
+		}
+		return nil, err
+	}
+
+	return file, nil
 }

+ 9 - 0
internal/server/controllers/plugins.go

@@ -92,6 +92,15 @@ func FetchPluginInstallationTask(c *gin.Context) {
 	})
 }
 
+func DeletePluginInstallationTask(c *gin.Context) {
+	BindRequest(c, func(request struct {
+		TenantID string `uri:"tenant_id" validate:"required"`
+		TaskID   string `uri:"id" validate:"required"`
+	}) {
+		c.JSON(http.StatusOK, service.DeletePluginInstallationTask(request.TenantID, request.TaskID))
+	})
+}
+
 func FetchPluginManifest(c *gin.Context) {
 	BindRequest(c, func(request struct {
 		TenantID               string                                 `uri:"tenant_id" validate:"required"`

+ 1 - 0
internal/server/http_server.go

@@ -116,6 +116,7 @@ func (app *App) pluginManagementGroup(group *gin.RouterGroup, config *app.Config
 	group.POST("/install/upload", controllers.UploadPlugin(config))
 	group.POST("/install/identifiers", controllers.InstallPluginFromIdentifiers(config))
 	group.GET("/install/tasks/:id", controllers.FetchPluginInstallationTask)
+	group.POST("/install/tasks/:id/delete", controllers.DeletePluginInstallationTask)
 	group.GET("/install/tasks", controllers.FetchPluginInstallationTasks)
 	group.GET("/fetch/manifest", controllers.FetchPluginManifest)
 	group.GET("/fetch/identifier", controllers.FetchPluginFromIdentifier)

+ 31 - 22
internal/service/install_plugin.go

@@ -1,8 +1,8 @@
 package service
 
 import (
+	"errors"
 	"fmt"
-	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
@@ -51,8 +51,6 @@ func InstallPluginFromIdentifiers(
 			Message:                "",
 		})
 
-		task.TotalPlugins++
-
 		if err == nil {
 			// already installed by other tenant
 			declaration := plugin.Declaration
@@ -111,11 +109,17 @@ func InstallPluginFromIdentifiers(
 
 					task_pointer := &task
 					var plugin_status *models.InstallTaskPluginStatus
-					for _, plugin := range task.Plugins {
-						if plugin.PluginUniqueIdentifier == plugin_unique_identifier {
-							plugin_status = &plugin
+					for i := range task.Plugins {
+						if task.Plugins[i].PluginUniqueIdentifier == plugin_unique_identifier {
+							plugin_status = &task.Plugins[i]
+							break
 						}
 					}
+
+					if plugin_status == nil {
+						return errors.New("plugin status not found")
+					}
+
 					modifier(task_pointer, plugin_status)
 					return db.Update(task_pointer, tx)
 				}); err != nil {
@@ -193,22 +197,7 @@ func InstallPluginFromIdentifiers(
 	}
 
 	// submit async tasks
-	routine.WithMaxRoutine(3, tasks, func() {
-		time.AfterFunc(time.Second*5, func() {
-			// get task
-			task, err := db.GetOne[models.InstallTask](
-				db.Equal("id", task.ID),
-			)
-			if err != nil {
-				return
-			}
-
-			if task.CompletedPlugins == task.TotalPlugins {
-				// delete task if all plugins are installed successfully
-				db.Delete(&task)
-			}
-		})
-	})
+	routine.WithMaxRoutine(3, tasks)
 
 	return entities.NewSuccessResponse(response)
 }
@@ -245,6 +234,26 @@ func FetchPluginInstallationTask(
 	return entities.NewSuccessResponse(task)
 }
 
+func DeletePluginInstallationTask(
+	tenant_id string,
+	task_id string,
+) *entities.Response {
+	err := db.DeleteByCondition(
+		models.InstallTask{
+			Model: models.Model{
+				ID: task_id,
+			},
+			TenantID: tenant_id,
+		},
+	)
+
+	if err != nil {
+		return entities.NewErrorResponse(-500, err.Error())
+	}
+
+	return entities.NewSuccessResponse(true)
+}
+
 func FetchPluginFromIdentifier(
 	plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
 ) *entities.Response {

+ 3 - 8
internal/utils/routine/pool.go

@@ -51,15 +51,10 @@ func WithMaxRoutine(max_routine int, tasks []func(), on_finish ...func()) {
 			Submit(func() {
 				defer wg.Done()
 				current_index := atomic.AddInt32(&task_index, 1)
-
-				if current_index >= int32(len(tasks)) {
-					return
-				}
-
-				for current_index < int32(len(tasks)) {
-					task := tasks[current_index]
+				for current_index <= int32(len(tasks)) {
+					task := tasks[current_index-1]
 					task()
-					current_index++
+					current_index = atomic.AddInt32(&task_index, 1)
 				}
 			})
 		}