| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 | import loggingimport uuidfrom enum import StrEnumfrom typing import Optional, castfrom uuid import uuid4import yaml  # type: ignorefrom packaging import versionfrom pydantic import BaseModelfrom sqlalchemy import selectfrom sqlalchemy.orm import Sessionfrom core.helper import ssrf_proxyfrom events.app_event import app_model_config_was_updated, app_was_createdfrom extensions.ext_redis import redis_clientfrom factories import variable_factoryfrom models import Account, App, AppModefrom models.model import AppModelConfigfrom services.workflow_service import WorkflowServicelogger = logging.getLogger(__name__)IMPORT_INFO_REDIS_KEY_PREFIX = "app_import_info:"IMPORT_INFO_REDIS_EXPIRY = 180  # 3 minutesCURRENT_DSL_VERSION = "0.1.5"class ImportMode(StrEnum):    YAML_CONTENT = "yaml-content"    YAML_URL = "yaml-url"class ImportStatus(StrEnum):    COMPLETED = "completed"    COMPLETED_WITH_WARNINGS = "completed-with-warnings"    PENDING = "pending"    FAILED = "failed"class Import(BaseModel):    id: str    status: ImportStatus    app_id: Optional[str] = None    current_dsl_version: str = CURRENT_DSL_VERSION    imported_dsl_version: str = ""    error: str = ""def _check_version_compatibility(imported_version: str) -> ImportStatus:    """Determine import status based on version comparison"""    try:        current_ver = version.parse(CURRENT_DSL_VERSION)        imported_ver = version.parse(imported_version)    except version.InvalidVersion:        return ImportStatus.FAILED    # Compare major version and minor version    if current_ver.major != imported_ver.major or current_ver.minor != imported_ver.minor:        return ImportStatus.PENDING    if current_ver.micro != imported_ver.micro:        return ImportStatus.COMPLETED_WITH_WARNINGS    return ImportStatus.COMPLETEDclass PendingData(BaseModel):    import_mode: str    yaml_content: str    name: str | None    description: str | None    icon_type: str | None    icon: str | None    icon_background: str | None    app_id: str | Noneclass AppDslService:    def __init__(self, session: Session):        self._session = session    def import_app(        self,        *,        account: Account,        import_mode: str,        yaml_content: Optional[str] = None,        yaml_url: Optional[str] = None,        name: Optional[str] = None,        description: Optional[str] = None,        icon_type: Optional[str] = None,        icon: Optional[str] = None,        icon_background: Optional[str] = None,        app_id: Optional[str] = None,    ) -> Import:        """Import an app from YAML content or URL."""        import_id = str(uuid.uuid4())        # Validate import mode        try:            mode = ImportMode(import_mode)        except ValueError:            raise ValueError(f"Invalid import_mode: {import_mode}")        # Get YAML content        content: bytes | str = b""        if mode == ImportMode.YAML_URL:            if not yaml_url:                return Import(                    id=import_id,                    status=ImportStatus.FAILED,                    error="yaml_url is required when import_mode is yaml-url",                )            try:                max_size = 10 * 1024 * 1024  # 10MB                # tricky way to handle url from github to github raw url                if yaml_url.startswith("https://github.com") and yaml_url.endswith((".yml", ".yaml")):                    yaml_url = yaml_url.replace("https://github.com", "https://raw.githubusercontent.com")                    yaml_url = yaml_url.replace("/blob/", "/")                response = ssrf_proxy.get(yaml_url.strip(), follow_redirects=True, timeout=(10, 10))                response.raise_for_status()                content = response.content                if len(content) > max_size:                    return Import(                        id=import_id,                        status=ImportStatus.FAILED,                        error="File size exceeds the limit of 10MB",                    )                if not content:                    return Import(                        id=import_id,                        status=ImportStatus.FAILED,                        error="Empty content from url",                    )                try:                    content = cast(bytes, content).decode("utf-8")                except UnicodeDecodeError as e:                    return Import(                        id=import_id,                        status=ImportStatus.FAILED,                        error=f"Error decoding content: {e}",                    )            except Exception as e:                return Import(                    id=import_id,                    status=ImportStatus.FAILED,                    error=f"Error fetching YAML from URL: {str(e)}",                )        elif mode == ImportMode.YAML_CONTENT:            if not yaml_content:                return Import(                    id=import_id,                    status=ImportStatus.FAILED,                    error="yaml_content is required when import_mode is yaml-content",                )            content = yaml_content        # Process YAML content        try:            # Parse YAML to validate format            data = yaml.safe_load(content)            if not isinstance(data, dict):                return Import(                    id=import_id,                    status=ImportStatus.FAILED,                    error="Invalid YAML format: content must be a mapping",                )            # Validate and fix DSL version            if not data.get("version"):                data["version"] = "0.1.0"            if not data.get("kind") or data.get("kind") != "app":                data["kind"] = "app"            imported_version = data.get("version", "0.1.0")            # check if imported_version is a float-like string            if not isinstance(imported_version, str):                raise ValueError(f"Invalid version type, expected str, got {type(imported_version)}")            status = _check_version_compatibility(imported_version)            # Extract app data            app_data = data.get("app")            if not app_data:                return Import(                    id=import_id,                    status=ImportStatus.FAILED,                    error="Missing app data in YAML content",                )            # If app_id is provided, check if it exists            app = None            if app_id:                stmt = select(App).where(App.id == app_id, App.tenant_id == account.current_tenant_id)                app = self._session.scalar(stmt)                if not app:                    return Import(                        id=import_id,                        status=ImportStatus.FAILED,                        error="App not found",                    )                if app.mode not in [AppMode.WORKFLOW.value, AppMode.ADVANCED_CHAT.value]:                    return Import(                        id=import_id,                        status=ImportStatus.FAILED,                        error="Only workflow or advanced chat apps can be overwritten",                    )            # If major version mismatch, store import info in Redis            if status == ImportStatus.PENDING:                panding_data = PendingData(                    import_mode=import_mode,                    yaml_content=content,                    name=name,                    description=description,                    icon_type=icon_type,                    icon=icon,                    icon_background=icon_background,                    app_id=app_id,                )                redis_client.setex(                    f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}",                    IMPORT_INFO_REDIS_EXPIRY,                    panding_data.model_dump_json(),                )                return Import(                    id=import_id,                    status=status,                    app_id=app_id,                    imported_dsl_version=imported_version,                )            # Create or update app            app = self._create_or_update_app(                app=app,                data=data,                account=account,                name=name,                description=description,                icon_type=icon_type,                icon=icon,                icon_background=icon_background,            )            return Import(                id=import_id,                status=status,                app_id=app.id,                imported_dsl_version=imported_version,            )        except yaml.YAMLError as e:            return Import(                id=import_id,                status=ImportStatus.FAILED,                error=f"Invalid YAML format: {str(e)}",            )        except Exception as e:            logger.exception("Failed to import app")            return Import(                id=import_id,                status=ImportStatus.FAILED,                error=str(e),            )    def confirm_import(self, *, import_id: str, account: Account) -> Import:        """        Confirm an import that requires confirmation        """        redis_key = f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}"        pending_data = redis_client.get(redis_key)        if not pending_data:            return Import(                id=import_id,                status=ImportStatus.FAILED,                error="Import information expired or does not exist",            )        try:            if not isinstance(pending_data, str | bytes):                return Import(                    id=import_id,                    status=ImportStatus.FAILED,                    error="Invalid import information",                )            pending_data = PendingData.model_validate_json(pending_data)            data = yaml.safe_load(pending_data.yaml_content)            app = None            if pending_data.app_id:                stmt = select(App).where(App.id == pending_data.app_id, App.tenant_id == account.current_tenant_id)                app = self._session.scalar(stmt)            # Create or update app            app = self._create_or_update_app(                app=app,                data=data,                account=account,                name=pending_data.name,                description=pending_data.description,                icon_type=pending_data.icon_type,                icon=pending_data.icon,                icon_background=pending_data.icon_background,            )            # Delete import info from Redis            redis_client.delete(redis_key)            return Import(                id=import_id,                status=ImportStatus.COMPLETED,                app_id=app.id,                current_dsl_version=CURRENT_DSL_VERSION,                imported_dsl_version=data.get("version", "0.1.0"),            )        except Exception as e:            logger.exception("Error confirming import")            return Import(                id=import_id,                status=ImportStatus.FAILED,                error=str(e),            )    def _create_or_update_app(        self,        *,        app: Optional[App],        data: dict,        account: Account,        name: Optional[str] = None,        description: Optional[str] = None,        icon_type: Optional[str] = None,        icon: Optional[str] = None,        icon_background: Optional[str] = None,    ) -> App:        """Create a new app or update an existing one."""        app_data = data.get("app", {})        app_mode = app_data.get("mode")        if not app_mode:            raise ValueError("loss app mode")        app_mode = AppMode(app_mode)        # Set icon type        icon_type_value = icon_type or app_data.get("icon_type")        if icon_type_value in ["emoji", "link"]:            icon_type = icon_type_value        else:            icon_type = "emoji"        icon = icon or str(app_data.get("icon", ""))        if app:            # Update existing app            app.name = name or app_data.get("name", app.name)            app.description = description or app_data.get("description", app.description)            app.icon_type = icon_type            app.icon = icon            app.icon_background = icon_background or app_data.get("icon_background", app.icon_background)            app.updated_by = account.id        else:            if account.current_tenant_id is None:                raise ValueError("Current tenant is not set")            # Create new app            app = App()            app.id = str(uuid4())            app.tenant_id = account.current_tenant_id            app.mode = app_mode.value            app.name = name or app_data.get("name", "")            app.description = description or app_data.get("description", "")            app.icon_type = icon_type            app.icon = icon            app.icon_background = icon_background or app_data.get("icon_background", "#FFFFFF")            app.enable_site = True            app.enable_api = True            app.use_icon_as_answer_icon = app_data.get("use_icon_as_answer_icon", False)            app.created_by = account.id            app.updated_by = account.id            self._session.add(app)            self._session.commit()            app_was_created.send(app, account=account)        # Initialize app based on mode        if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:            workflow_data = data.get("workflow")            if not workflow_data or not isinstance(workflow_data, dict):                raise ValueError("Missing workflow data for workflow/advanced chat app")            environment_variables_list = workflow_data.get("environment_variables", [])            environment_variables = [                variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list            ]            conversation_variables_list = workflow_data.get("conversation_variables", [])            conversation_variables = [                variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list            ]            workflow_service = WorkflowService()            current_draft_workflow = workflow_service.get_draft_workflow(app_model=app)            if current_draft_workflow:                unique_hash = current_draft_workflow.unique_hash            else:                unique_hash = None            workflow_service.sync_draft_workflow(                app_model=app,                graph=workflow_data.get("graph", {}),                features=workflow_data.get("features", {}),                unique_hash=unique_hash,                account=account,                environment_variables=environment_variables,                conversation_variables=conversation_variables,            )        elif app_mode in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION}:            # Initialize model config            model_config = data.get("model_config")            if not model_config or not isinstance(model_config, dict):                raise ValueError("Missing model_config for chat/agent-chat/completion app")            # Initialize or update model config            if not app.app_model_config:                app_model_config = AppModelConfig().from_model_config_dict(model_config)                app_model_config.id = str(uuid4())                app_model_config.app_id = app.id                app_model_config.created_by = account.id                app_model_config.updated_by = account.id                app.app_model_config_id = app_model_config.id                self._session.add(app_model_config)                app_model_config_was_updated.send(app, app_model_config=app_model_config)        else:            raise ValueError("Invalid app mode")        return app    @classmethod    def export_dsl(cls, app_model: App, include_secret: bool = False) -> str:        """        Export app        :param app_model: App instance        :return:        """        app_mode = AppMode.value_of(app_model.mode)        export_data = {            "version": CURRENT_DSL_VERSION,            "kind": "app",            "app": {                "name": app_model.name,                "mode": app_model.mode,                "icon": "🤖" if app_model.icon_type == "image" else app_model.icon,                "icon_background": "#FFEAD5" if app_model.icon_type == "image" else app_model.icon_background,                "description": app_model.description,                "use_icon_as_answer_icon": app_model.use_icon_as_answer_icon,            },        }        if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:            cls._append_workflow_export_data(                export_data=export_data, app_model=app_model, include_secret=include_secret            )        else:            cls._append_model_config_export_data(export_data, app_model)        return yaml.dump(export_data, allow_unicode=True)  # type: ignore    @classmethod    def _append_workflow_export_data(cls, *, export_data: dict, app_model: App, include_secret: bool) -> None:        """        Append workflow export data        :param export_data: export data        :param app_model: App instance        """        workflow_service = WorkflowService()        workflow = workflow_service.get_draft_workflow(app_model)        if not workflow:            raise ValueError("Missing draft workflow configuration, please check.")        export_data["workflow"] = workflow.to_dict(include_secret=include_secret)    @classmethod    def _append_model_config_export_data(cls, export_data: dict, app_model: App) -> None:        """        Append model config export data        :param export_data: export data        :param app_model: App instance        """        app_model_config = app_model.app_model_config        if not app_model_config:            raise ValueError("Missing app configuration, please check.")        export_data["model_config"] = app_model_config.to_dict()
 |