Преглед на файлове

refactor: plugin installation

Yeuoly преди 6 месеца
родител
ревизия
276701e1b7

+ 109 - 51
api/controllers/console/workspace/plugin.py

@@ -1,7 +1,6 @@
 import io
-import json
 
-from flask import Response, request, send_file
+from flask import request, send_file
 from flask_login import current_user
 from flask_restful import Resource, reqparse
 from werkzeug.exceptions import Forbidden
@@ -11,7 +10,6 @@ from controllers.console import api
 from controllers.console.setup import setup_required
 from controllers.console.wraps import account_initialization_required
 from core.model_runtime.utils.encoders import jsonable_encoder
-from core.plugin.entities.plugin_daemon import InstallPluginMessage
 from libs.login import login_required
 from services.plugin.plugin_service import PluginService
 
@@ -59,37 +57,63 @@ class PluginIconApi(Resource):
         return send_file(io.BytesIO(icon_bytes), mimetype=mimetype, max_age=icon_cache_max_age)
 
 
-class PluginInstallCheckUniqueIdentifierApi(Resource):
+class PluginUploadPkgApi(Resource):
     @setup_required
     @login_required
     @account_initialization_required
-    def get(self):
-        req = reqparse.RequestParser()
-        req.add_argument("plugin_unique_identifier", type=str, required=True, location="args")
-        args = req.parse_args()
+    def post(self):
+        user = current_user
+        if not user.is_admin_or_owner:
+            raise Forbidden()
+
+        tenant_id = user.current_tenant_id
+        file = request.files["pkg"]
+        content = file.read()
+        return {"plugin_unique_identifier": PluginService.upload_pkg(tenant_id, content)}
 
+
+class PluginUploadFromPkgApi(Resource):
+    @setup_required
+    @login_required
+    @account_initialization_required
+    def post(self):
         user = current_user
+        if not user.is_admin_or_owner:
+            raise Forbidden()
+
         tenant_id = user.current_tenant_id
 
-        return {"installed": PluginService.check_plugin_unique_identifier(tenant_id, args["plugin_unique_identifier"])}
+        file = request.files["pkg"]
+        content = file.read()
+        response = PluginService.upload_pkg(tenant_id, content)
+
+        return {
+            "plugin_unique_identifier": response,
+        }
 
 
-class PluginInstallFromUniqueIdentifierApi(Resource):
+class PluginUploadFromGithubApi(Resource):
     @setup_required
     @login_required
     @account_initialization_required
     def post(self):
-        req = reqparse.RequestParser()
-        req.add_argument("plugin_unique_identifier", type=str, required=True, location="json")
-        args = req.parse_args()
-
         user = current_user
         if not user.is_admin_or_owner:
             raise Forbidden()
 
         tenant_id = user.current_tenant_id
 
-        return {"success": PluginService.install_from_unique_identifier(tenant_id, args["plugin_unique_identifier"])}
+        parser = reqparse.RequestParser()
+        parser.add_argument("repo", type=str, required=True, location="json")
+        parser.add_argument("version", type=str, required=True, location="json")
+        parser.add_argument("package", type=str, required=True, location="json")
+        args = parser.parse_args()
+
+        response = PluginService.upload_pkg_from_github(tenant_id, args["repo"], args["version"], args["package"])
+
+        return {
+            "plugin_unique_identifier": response,
+        }
 
 
 class PluginInstallFromPkgApi(Resource):
@@ -103,19 +127,15 @@ class PluginInstallFromPkgApi(Resource):
 
         tenant_id = user.current_tenant_id
 
-        file = request.files["pkg"]
-        content = file.read()
+        parser = reqparse.RequestParser()
+        parser.add_argument("plugin_unique_identifier", type=str, required=True, location="json")
+        args = parser.parse_args()
 
-        def generator():
-            try:
-                response = PluginService.install_from_local_pkg(tenant_id, content)
-                for message in response:
-                    yield f"data: {json.dumps(jsonable_encoder(message))}\n\n"
-            except ValueError as e:
-                error_message = InstallPluginMessage(event=InstallPluginMessage.Event.Error, data=str(e))
-                yield f"data: {json.dumps(jsonable_encoder(error_message))}\n\n"
+        response = PluginService.install_from_local_pkg(tenant_id, args["plugin_unique_identifier"])
 
-        return Response(generator(), mimetype="text/event-stream")
+        return {
+            "task_id": response,
+        }
 
 
 class PluginInstallFromGithubApi(Resource):
@@ -133,20 +153,16 @@ class PluginInstallFromGithubApi(Resource):
         parser.add_argument("repo", type=str, required=True, location="json")
         parser.add_argument("version", type=str, required=True, location="json")
         parser.add_argument("package", type=str, required=True, location="json")
+        parser.add_argument("plugin_unique_identifier", type=str, required=True, location="json")
         args = parser.parse_args()
 
-        def generator():
-            try:
-                response = PluginService.install_from_github_pkg(
-                    tenant_id, args["repo"], args["version"], args["package"]
-                )
-                for message in response:
-                    yield f"data: {json.dumps(jsonable_encoder(message))}\n\n"
-            except ValueError as e:
-                error_message = InstallPluginMessage(event=InstallPluginMessage.Event.Error, data=str(e))
-                yield f"data: {json.dumps(jsonable_encoder(error_message))}\n\n"
+        response = PluginService.install_from_github(
+            tenant_id, args["repo"], args["version"], args["package"], args["plugin_unique_identifier"]
+        )
 
-        return Response(generator(), mimetype="text/event-stream")
+        return {
+            "task_id": response,
+        }
 
 
 class PluginInstallFromMarketplaceApi(Resource):
@@ -164,16 +180,55 @@ class PluginInstallFromMarketplaceApi(Resource):
         parser.add_argument("plugin_unique_identifier", type=str, required=True, location="json")
         args = parser.parse_args()
 
-        def generator():
-            try:
-                response = PluginService.install_from_marketplace_pkg(tenant_id, args["plugin_unique_identifier"])
-                for message in response:
-                    yield f"data: {json.dumps(jsonable_encoder(message))}\n\n"
-            except ValueError as e:
-                error_message = InstallPluginMessage(event=InstallPluginMessage.Event.Error, data=str(e))
-                yield f"data: {json.dumps(jsonable_encoder(error_message))}\n\n"
+        response = PluginService.install_from_marketplace_pkg(tenant_id, args["plugin_unique_identifier"])
+
+        return {
+            "task_id": response,
+        }
+
+
+class PluginFetchManifestApi(Resource):
+    @setup_required
+    @login_required
+    @account_initialization_required
+    def get(self):
+        user = current_user
+
+        parser = reqparse.RequestParser()
+        parser.add_argument("plugin_unique_identifier", type=str, required=True, location="args")
+        args = parser.parse_args()
+
+        tenant_id = user.current_tenant_id
+
+        return {"manifest": PluginService.fetch_plugin_manifest(tenant_id, args["plugin_unique_identifier"])}
+
+
+class PluginFetchInstallTasksApi(Resource):
+    @setup_required
+    @login_required
+    @account_initialization_required
+    def get(self):
+        user = current_user
+        if not user.is_admin_or_owner:
+            raise Forbidden()
+
+        tenant_id = user.current_tenant_id
+
+        return {"tasks": PluginService.fetch_install_tasks(tenant_id)}
+
+
+class PluginFetchInstallTaskApi(Resource):
+    @setup_required
+    @login_required
+    @account_initialization_required
+    def get(self, task_id: str):
+        user = current_user
+        if not user.is_admin_or_owner:
+            raise Forbidden()
+
+        tenant_id = user.current_tenant_id
 
-        return Response(generator(), mimetype="text/event-stream")
+        return {"task": PluginService.fetch_install_task(tenant_id, task_id)}
 
 
 class PluginUninstallApi(Resource):
@@ -197,9 +252,12 @@ class PluginUninstallApi(Resource):
 api.add_resource(PluginDebuggingKeyApi, "/workspaces/current/plugin/debugging-key")
 api.add_resource(PluginListApi, "/workspaces/current/plugin/list")
 api.add_resource(PluginIconApi, "/workspaces/current/plugin/icon")
-api.add_resource(PluginInstallCheckUniqueIdentifierApi, "/workspaces/current/plugin/install/check_unique_identifier")
-api.add_resource(PluginInstallFromUniqueIdentifierApi, "/workspaces/current/plugin/install/from_unique_identifier")
-api.add_resource(PluginInstallFromPkgApi, "/workspaces/current/plugin/install/from_pkg")
-api.add_resource(PluginInstallFromGithubApi, "/workspaces/current/plugin/install/from_github")
-api.add_resource(PluginInstallFromMarketplaceApi, "/workspaces/current/plugin/install/from_marketplace")
+api.add_resource(PluginUploadFromPkgApi, "/workspaces/current/plugin/upload/pkg")
+api.add_resource(PluginUploadFromGithubApi, "/workspaces/current/plugin/upload/github")
+api.add_resource(PluginInstallFromPkgApi, "/workspaces/current/plugin/install/pkg")
+api.add_resource(PluginInstallFromGithubApi, "/workspaces/current/plugin/install/github")
+api.add_resource(PluginInstallFromMarketplaceApi, "/workspaces/current/plugin/install/marketplace")
+api.add_resource(PluginFetchManifestApi, "/workspaces/current/plugin/fetch-manifest")
+api.add_resource(PluginFetchInstallTasksApi, "/workspaces/current/plugin/tasks")
+api.add_resource(PluginFetchInstallTaskApi, "/workspaces/current/plugin/tasks/<task_id>")
 api.add_resource(PluginUninstallApi, "/workspaces/current/plugin/uninstall")

+ 23 - 1
api/core/plugin/entities/plugin_daemon.py

@@ -6,9 +6,10 @@ from pydantic import BaseModel, ConfigDict, Field
 
 from core.model_runtime.entities.model_entities import AIModelEntity
 from core.model_runtime.entities.provider_entities import ProviderEntity
+from core.plugin.entities.base import BasePluginEntity
 from core.tools.entities.tool_entities import ToolProviderEntityWithPlugin
 
-T = TypeVar("T", bound=(BaseModel | dict | list | bool))
+T = TypeVar("T", bound=(BaseModel | dict | list | bool | str))
 
 
 class PluginDaemonBasicResponse(BaseModel, Generic[T]):
@@ -106,3 +107,24 @@ class PluginDaemonInnerError(Exception):
     def __init__(self, code: int, message: str):
         self.code = code
         self.message = message
+
+
+class PluginInstallTaskStatus(str, Enum):
+    Pending = "pending"
+    Running = "running"
+    Success = "success"
+    Failed = "failed"
+
+
+class PluginInstallTaskPluginStatus(BaseModel):
+    plugin_unique_identifier: str = Field(description="The plugin unique identifier of the install task.")
+    plugin_id: str = Field(description="The plugin ID of the install task.")
+    status: PluginInstallTaskStatus = Field(description="The status of the install task.")
+    message: str = Field(description="The message of the install task.")
+
+
+class PluginInstallTask(BasePluginEntity):
+    status: PluginInstallTaskStatus = Field(description="The status of the install task.")
+    total_plugins: int = Field(description="The total number of plugins to be installed.")
+    completed_plugins: int = Field(description="The number of plugins that have been installed.")
+    plugins: list[PluginInstallTaskPluginStatus] = Field(description="The status of the plugins.")

+ 1 - 1
api/core/plugin/manager/base.py

@@ -19,7 +19,7 @@ from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse, Plugin
 plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_API_URL
 plugin_daemon_inner_api_key = dify_config.PLUGIN_API_KEY
 
-T = TypeVar("T", bound=(BaseModel | dict | list | bool))
+T = TypeVar("T", bound=(BaseModel | dict | list | bool | str))
 
 
 class BasePluginManager:

+ 51 - 22
api/core/plugin/manager/plugin.py

@@ -1,16 +1,16 @@
-import json
-from collections.abc import Generator, Mapping
-from typing import Any
+from collections.abc import Sequence
 
-from core.plugin.entities.plugin import PluginEntity, PluginInstallationSource
-from core.plugin.entities.plugin_daemon import InstallPluginMessage
+from core.plugin.entities.plugin import PluginDeclaration, PluginEntity, PluginInstallationSource
+from core.plugin.entities.plugin_daemon import PluginInstallTask
 from core.plugin.manager.base import BasePluginManager
 
 
 class PluginInstallationManager(BasePluginManager):
-    def fetch_plugin_by_identifier(self, tenant_id: str, identifier: str) -> bool:
-        # urlencode the identifier
-
+    def fetch_plugin_by_identifier(
+        self,
+        tenant_id: str,
+        identifier: str,
+    ) -> bool:
         return self._request_with_plugin_daemon_response(
             "GET",
             f"plugin/{tenant_id}/management/fetch/identifier",
@@ -26,37 +26,34 @@ class PluginInstallationManager(BasePluginManager):
             params={"page": 1, "page_size": 256},
         )
 
-    def install_from_pkg(
+    def upload_pkg(
         self,
         tenant_id: str,
         pkg: bytes,
-        source: PluginInstallationSource,
-        meta: Mapping[str, Any],
         verify_signature: bool = False,
-    ) -> Generator[InstallPluginMessage, None, None]:
+    ) -> str:
         """
-        Install a plugin from a package.
+        Upload a plugin package and return the plugin unique identifier.
         """
-        # using multipart/form-data to encode body
         body = {
             "dify_pkg": ("dify_pkg", pkg, "application/octet-stream"),
         }
 
         data = {
             "verify_signature": "true" if verify_signature else "false",
-            "source": source.value,
-            "meta": json.dumps(meta),
         }
 
-        return self._request_with_plugin_daemon_response_stream(
+        return self._request_with_plugin_daemon_response(
             "POST",
-            f"plugin/{tenant_id}/management/install/pkg",
-            InstallPluginMessage,
+            f"plugin/{tenant_id}/management/install/upload",
+            str,
             files=body,
             data=data,
         )
 
-    def install_from_identifier(self, tenant_id: str, identifier: str) -> bool:
+    def install_from_identifiers(
+        self, tenant_id: str, identifiers: Sequence[str], source: PluginInstallationSource, meta: dict
+    ) -> str:
         """
         Install a plugin from an identifier.
         """
@@ -64,13 +61,45 @@ class PluginInstallationManager(BasePluginManager):
         return self._request_with_plugin_daemon_response(
             "POST",
             f"plugin/{tenant_id}/management/install/identifier",
-            bool,
+            str,
             data={
-                "plugin_unique_identifier": identifier,
+                "plugin_unique_identifiers": identifiers,
+                "source": source,
+                "meta": meta,
             },
             headers={"Content-Type": "application/json"},
         )
 
+    def fetch_plugin_installation_tasks(self, tenant_id: str) -> Sequence[PluginInstallTask]:
+        """
+        Fetch plugin installation tasks.
+        """
+        return self._request_with_plugin_daemon_response(
+            "GET",
+            f"plugin/{tenant_id}/management/install/tasks",
+            list[PluginInstallTask],
+        )
+
+    def fetch_plugin_installation_task(self, tenant_id: str, task_id: str) -> PluginInstallTask:
+        """
+        Fetch a plugin installation task.
+        """
+        return self._request_with_plugin_daemon_response(
+            "GET",
+            f"plugin/{tenant_id}/management/install/tasks/{task_id}",
+            PluginInstallTask,
+        )
+
+    def fetch_plugin_manifest(self, tenant_id: str, plugin_unique_identifier: str) -> PluginDeclaration:
+        """
+        Fetch a plugin manifest.
+        """
+        return self._request_with_plugin_daemon_response(
+            "GET",
+            f"plugin/{tenant_id}/management/fetch/identifier",
+            PluginDeclaration,
+        )
+
     def uninstall(self, tenant_id: str, plugin_installation_id: str) -> bool:
         """
         Uninstall a plugin.

+ 83 - 43
api/services/plugin/plugin_service.py

@@ -1,10 +1,10 @@
-from collections.abc import Generator
+from collections.abc import Sequence
 from mimetypes import guess_type
 
 from core.helper.download import download_with_size_limit
 from core.helper.marketplace import download_plugin_pkg
-from core.plugin.entities.plugin import PluginEntity, PluginInstallationSource
-from core.plugin.entities.plugin_daemon import InstallPluginMessage, PluginDaemonInnerError
+from core.plugin.entities.plugin import PluginDeclaration, PluginEntity, PluginInstallationSource
+from core.plugin.entities.plugin_daemon import PluginInstallTask
 from core.plugin.manager.asset import PluginAssetManager
 from core.plugin.manager.debugging import PluginDebuggingManager
 from core.plugin.manager.plugin import PluginInstallationManager
@@ -13,16 +13,25 @@ from core.plugin.manager.plugin import PluginInstallationManager
 class PluginService:
     @staticmethod
     def get_debugging_key(tenant_id: str) -> str:
+        """
+        get the debugging key of the tenant
+        """
         manager = PluginDebuggingManager()
         return manager.get_debugging_key(tenant_id)
 
     @staticmethod
     def list(tenant_id: str) -> list[PluginEntity]:
+        """
+        list all plugins of the tenant
+        """
         manager = PluginInstallationManager()
         return manager.list_plugins(tenant_id)
 
     @staticmethod
     def get_asset(tenant_id: str, asset_file: str) -> tuple[bytes, str]:
+        """
+        get the asset file of the plugin
+        """
         manager = PluginAssetManager()
         # guess mime type
         mime_type, _ = guess_type(asset_file)
@@ -30,73 +39,104 @@ class PluginService:
 
     @staticmethod
     def check_plugin_unique_identifier(tenant_id: str, plugin_unique_identifier: str) -> bool:
+        """
+        check if the plugin unique identifier is already installed by other tenant
+        """
         manager = PluginInstallationManager()
         return manager.fetch_plugin_by_identifier(tenant_id, plugin_unique_identifier)
 
     @staticmethod
-    def install_from_unique_identifier(tenant_id: str, plugin_unique_identifier: str) -> bool:
+    def fetch_plugin_manifest(tenant_id: str, plugin_unique_identifier: str) -> PluginDeclaration:
+        manager = PluginInstallationManager()
+        return manager.fetch_plugin_manifest(tenant_id, plugin_unique_identifier)
+
+    @staticmethod
+    def fetch_install_tasks(tenant_id: str) -> Sequence[PluginInstallTask]:
+        manager = PluginInstallationManager()
+        return manager.fetch_plugin_installation_tasks(tenant_id)
+
+    @staticmethod
+    def fetch_install_task(tenant_id: str, task_id: str) -> PluginInstallTask:
         manager = PluginInstallationManager()
-        return manager.install_from_identifier(tenant_id, plugin_unique_identifier)
+        return manager.fetch_plugin_installation_task(tenant_id, task_id)
 
     @staticmethod
-    def install_from_local_pkg(tenant_id: str, pkg: bytes) -> Generator[InstallPluginMessage, None, None]:
+    def upload_pkg(tenant_id: str, pkg: bytes) -> str:
         """
-        Install plugin from uploaded package files
+        Upload plugin package files
+
+        returns: plugin_unique_identifier
         """
         manager = PluginInstallationManager()
-        try:
-            yield from manager.install_from_pkg(tenant_id, pkg, PluginInstallationSource.Package, {})
-        except PluginDaemonInnerError as e:
-            yield InstallPluginMessage(event=InstallPluginMessage.Event.Error, data=str(e.message))
+        return manager.upload_pkg(tenant_id, pkg)
 
     @staticmethod
-    def install_from_github_pkg(
-        tenant_id: str, repo: str, version: str, package: str
-    ) -> Generator[InstallPluginMessage, None, None]:
+    def upload_pkg_from_github(tenant_id: str, repo: str, version: str, package: str) -> str:
         """
-        Install plugin from github release package files
+        Install plugin from github release package files,
+        returns plugin_unique_identifier
         """
         pkg = download_with_size_limit(
             f"https://github.com/{repo}/releases/download/{version}/{package}", 15 * 1024 * 1024
         )
 
         manager = PluginInstallationManager()
-        try:
-            yield from manager.install_from_pkg(
-                tenant_id,
-                pkg,
-                PluginInstallationSource.Github,
-                {
-                    "repo": repo,
-                    "version": version,
-                    "package": package,
-                },
-            )
-        except PluginDaemonInnerError as e:
-            yield InstallPluginMessage(event=InstallPluginMessage.Event.Error, data=str(e.message))
+        return manager.upload_pkg(
+            tenant_id,
+            pkg,
+        )
+
+    @staticmethod
+    def install_from_local_pkg(tenant_id: str, plugin_unique_identifier: str) -> str:
+        manager = PluginInstallationManager()
+        return manager.install_from_identifiers(
+            tenant_id,
+            [plugin_unique_identifier],
+            PluginInstallationSource.Package,
+            {},
+        )
+
+    @staticmethod
+    def install_from_github(
+        tenant_id: str, plugin_unique_identifier: str, repo: str, version: str, package: str
+    ) -> str:
+        """
+        Install plugin from github release package files,
+        returns plugin_unique_identifier
+        """
+        manager = PluginInstallationManager()
+        return manager.install_from_identifiers(
+            tenant_id,
+            [plugin_unique_identifier],
+            PluginInstallationSource.Github,
+            {
+                "repo": repo,
+                "version": version,
+                "package": package,
+            },
+        )
 
     @staticmethod
-    def install_from_marketplace_pkg(
-        tenant_id: str, plugin_unique_identifier: str
-    ) -> Generator[InstallPluginMessage, None, None]:
+    def install_from_marketplace_pkg(tenant_id: str, plugin_unique_identifier: str) -> str:
         """
-        TODO: wait for marketplace api
+        Install plugin from marketplace package files,
+        returns installation task id
         """
         manager = PluginInstallationManager()
 
         pkg = download_plugin_pkg(plugin_unique_identifier)
 
-        try:
-            yield from manager.install_from_pkg(
-                tenant_id,
-                pkg,
-                PluginInstallationSource.Marketplace,
-                {
-                    "plugin_unique_identifier": plugin_unique_identifier,
-                },
-            )
-        except PluginDaemonInnerError as e:
-            yield InstallPluginMessage(event=InstallPluginMessage.Event.Error, data=str(e.message))
+        # upload pkg to plugin daemon
+        pkg_id = manager.upload_pkg(tenant_id, pkg)
+
+        return manager.install_from_identifiers(
+            tenant_id,
+            [pkg_id],
+            PluginInstallationSource.Marketplace,
+            {
+                "plugin_unique_identifier": plugin_unique_identifier,
+            },
+        )
 
     @staticmethod
     def uninstall(tenant_id: str, plugin_installation_id: str) -> bool: