123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- import logging
- import uuid
- from enum import StrEnum
- from typing import Optional
- from uuid import uuid4
- import yaml
- from packaging import version
- from pydantic import BaseModel
- from sqlalchemy import select
- from sqlalchemy.orm import Session
- from core.helper import ssrf_proxy
- from events.app_event import app_model_config_was_updated, app_was_created
- from extensions.ext_redis import redis_client
- from factories import variable_factory
- from models import Account, App, AppMode
- from models.model import AppModelConfig
- from services.workflow_service import WorkflowService
- logger = logging.getLogger(__name__)
- IMPORT_INFO_REDIS_KEY_PREFIX = "app_import_info:"
- IMPORT_INFO_REDIS_EXPIRY = 180 # 3 minutes
- CURRENT_DSL_VERSION = "0.1.3"
- class ImportMode(StrEnum):
- YAML_CONTENT = "yaml-content"
- YAML_URL = "yaml-url"
- class ImportStatus(StrEnum):
- COMPLETED = "completed"
- COMPLETED_WITH_WARNINGS = "completed-with-warnings"
- PENDING = "pending"
- FAILED = "failed"
- class Import(BaseModel):
- id: str
- status: ImportStatus
- app_id: Optional[str] = None
- current_dsl_version: str = CURRENT_DSL_VERSION
- imported_dsl_version: str = ""
- error: str = ""
- def _check_version_compatibility(imported_version: str) -> ImportStatus:
- """Determine import status based on version comparison"""
- try:
- current_ver = version.parse(CURRENT_DSL_VERSION)
- imported_ver = version.parse(imported_version)
- except version.InvalidVersion:
- return ImportStatus.FAILED
- # Compare major version and minor version
- if current_ver.major != imported_ver.major or current_ver.minor != imported_ver.minor:
- return ImportStatus.PENDING
- if current_ver.micro != imported_ver.micro:
- return ImportStatus.COMPLETED_WITH_WARNINGS
- return ImportStatus.COMPLETED
- class PendingData(BaseModel):
- import_mode: str
- yaml_content: str
- name: str | None
- description: str | None
- icon_type: str | None
- icon: str | None
- icon_background: str | None
- app_id: str | None
- class AppDslService:
- def __init__(self, session: Session):
- self._session = session
- def import_app(
- self,
- *,
- account: Account,
- import_mode: str,
- yaml_content: Optional[str] = None,
- yaml_url: Optional[str] = None,
- name: Optional[str] = None,
- description: Optional[str] = None,
- icon_type: Optional[str] = None,
- icon: Optional[str] = None,
- icon_background: Optional[str] = None,
- app_id: Optional[str] = None,
- ) -> Import:
- """Import an app from YAML content or URL."""
- import_id = str(uuid.uuid4())
- # Validate import mode
- try:
- mode = ImportMode(import_mode)
- except ValueError:
- raise ValueError(f"Invalid import_mode: {import_mode}")
- # Get YAML content
- content = ""
- if mode == ImportMode.YAML_URL:
- if not yaml_url:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="yaml_url is required when import_mode is yaml-url",
- )
- try:
- max_size = 10 * 1024 * 1024 # 10MB
- response = ssrf_proxy.get(yaml_url.strip(), follow_redirects=True, timeout=(10, 10))
- response.raise_for_status()
- content = response.content
- if len(content) > max_size:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="File size exceeds the limit of 10MB",
- )
- if not content:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="Empty content from url",
- )
- try:
- content = content.decode("utf-8")
- except UnicodeDecodeError as e:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error=f"Error decoding content: {e}",
- )
- except Exception as e:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error=f"Error fetching YAML from URL: {str(e)}",
- )
- elif mode == ImportMode.YAML_CONTENT:
- if not yaml_content:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="yaml_content is required when import_mode is yaml-content",
- )
- content = yaml_content
- # Process YAML content
- try:
- # Parse YAML to validate format
- data = yaml.safe_load(content)
- if not isinstance(data, dict):
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="Invalid YAML format: content must be a mapping",
- )
- # Validate and fix DSL version
- if not data.get("version"):
- data["version"] = "0.1.0"
- if not data.get("kind") or data.get("kind") != "app":
- data["kind"] = "app"
- imported_version = data.get("version", "0.1.0")
- status = _check_version_compatibility(imported_version)
- # Extract app data
- app_data = data.get("app")
- if not app_data:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="Missing app data in YAML content",
- )
- # If app_id is provided, check if it exists
- app = None
- if app_id:
- stmt = select(App).where(App.id == app_id, App.tenant_id == account.current_tenant_id)
- app = self._session.scalar(stmt)
- if not app:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="App not found",
- )
- if app.mode not in [AppMode.WORKFLOW.value, AppMode.ADVANCED_CHAT.value]:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="Only workflow or advanced chat apps can be overwritten",
- )
- # If major version mismatch, store import info in Redis
- if status == ImportStatus.PENDING:
- panding_data = PendingData(
- import_mode=import_mode,
- yaml_content=content,
- name=name,
- description=description,
- icon_type=icon_type,
- icon=icon,
- icon_background=icon_background,
- app_id=app_id,
- )
- redis_client.setex(
- f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}",
- IMPORT_INFO_REDIS_EXPIRY,
- panding_data.model_dump_json(),
- )
- return Import(
- id=import_id,
- status=status,
- app_id=app_id,
- imported_dsl_version=imported_version,
- )
- # Create or update app
- app = self._create_or_update_app(
- app=app,
- data=data,
- account=account,
- name=name,
- description=description,
- icon_type=icon_type,
- icon=icon,
- icon_background=icon_background,
- )
- return Import(
- id=import_id,
- status=status,
- app_id=app.id,
- imported_dsl_version=imported_version,
- )
- except yaml.YAMLError as e:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error=f"Invalid YAML format: {str(e)}",
- )
- except Exception as e:
- logger.exception("Failed to import app")
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error=str(e),
- )
- def confirm_import(self, *, import_id: str, account: Account) -> Import:
- """
- Confirm an import that requires confirmation
- """
- redis_key = f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}"
- pending_data = redis_client.get(redis_key)
- if not pending_data:
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="Import information expired or does not exist",
- )
- try:
- if not isinstance(pending_data, str | bytes):
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error="Invalid import information",
- )
- pending_data = PendingData.model_validate_json(pending_data)
- data = yaml.safe_load(pending_data.yaml_content)
- app = None
- if pending_data.app_id:
- stmt = select(App).where(App.id == pending_data.app_id, App.tenant_id == account.current_tenant_id)
- app = self._session.scalar(stmt)
- # Create or update app
- app = self._create_or_update_app(
- app=app,
- data=data,
- account=account,
- name=pending_data.name,
- description=pending_data.description,
- icon_type=pending_data.icon_type,
- icon=pending_data.icon,
- icon_background=pending_data.icon_background,
- )
- # Delete import info from Redis
- redis_client.delete(redis_key)
- return Import(
- id=import_id,
- status=ImportStatus.COMPLETED,
- app_id=app.id,
- current_dsl_version=CURRENT_DSL_VERSION,
- imported_dsl_version=data.get("version", "0.1.0"),
- )
- except Exception as e:
- logger.exception("Error confirming import")
- return Import(
- id=import_id,
- status=ImportStatus.FAILED,
- error=str(e),
- )
- def _create_or_update_app(
- self,
- *,
- app: Optional[App],
- data: dict,
- account: Account,
- name: Optional[str] = None,
- description: Optional[str] = None,
- icon_type: Optional[str] = None,
- icon: Optional[str] = None,
- icon_background: Optional[str] = None,
- ) -> App:
- """Create a new app or update an existing one."""
- app_data = data.get("app", {})
- app_mode = AppMode(app_data["mode"])
- # Set icon type
- icon_type_value = icon_type or app_data.get("icon_type")
- if icon_type_value in ["emoji", "link"]:
- icon_type = icon_type_value
- else:
- icon_type = "emoji"
- icon = icon or str(app_data.get("icon", ""))
- if app:
- # Update existing app
- app.name = name or app_data.get("name", app.name)
- app.description = description or app_data.get("description", app.description)
- app.icon_type = icon_type
- app.icon = icon
- app.icon_background = icon_background or app_data.get("icon_background", app.icon_background)
- app.updated_by = account.id
- else:
- # Create new app
- app = App()
- app.id = str(uuid4())
- app.tenant_id = account.current_tenant_id
- app.mode = app_mode.value
- app.name = name or app_data.get("name", "")
- app.description = description or app_data.get("description", "")
- app.icon_type = icon_type
- app.icon = icon
- app.icon_background = icon_background or app_data.get("icon_background", "#FFFFFF")
- app.enable_site = True
- app.enable_api = True
- app.use_icon_as_answer_icon = app_data.get("use_icon_as_answer_icon", False)
- app.created_by = account.id
- app.updated_by = account.id
- self._session.add(app)
- self._session.commit()
- app_was_created.send(app, account=account)
- # Initialize app based on mode
- if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
- workflow_data = data.get("workflow")
- if not workflow_data or not isinstance(workflow_data, dict):
- raise ValueError("Missing workflow data for workflow/advanced chat app")
- environment_variables_list = workflow_data.get("environment_variables", [])
- environment_variables = [
- variable_factory.build_variable_from_mapping(obj) for obj in environment_variables_list
- ]
- conversation_variables_list = workflow_data.get("conversation_variables", [])
- conversation_variables = [
- variable_factory.build_variable_from_mapping(obj) for obj in conversation_variables_list
- ]
- workflow_service = WorkflowService()
- current_draft_workflow = workflow_service.get_draft_workflow(app_model=app)
- if current_draft_workflow:
- unique_hash = current_draft_workflow.unique_hash
- else:
- unique_hash = None
- workflow_service.sync_draft_workflow(
- app_model=app,
- graph=workflow_data.get("graph", {}),
- features=workflow_data.get("features", {}),
- unique_hash=unique_hash,
- account=account,
- environment_variables=environment_variables,
- conversation_variables=conversation_variables,
- )
- elif app_mode in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION}:
- # Initialize model config
- model_config = data.get("model_config")
- if not model_config or not isinstance(model_config, dict):
- raise ValueError("Missing model_config for chat/agent-chat/completion app")
- # Initialize or update model config
- if not app.app_model_config:
- app_model_config = AppModelConfig().from_model_config_dict(model_config)
- app_model_config.id = str(uuid4())
- app_model_config.app_id = app.id
- app_model_config.created_by = account.id
- app_model_config.updated_by = account.id
- app.app_model_config_id = app_model_config.id
- self._session.add(app_model_config)
- app_model_config_was_updated.send(app, app_model_config=app_model_config)
- else:
- raise ValueError("Invalid app mode")
- return app
- @classmethod
- def export_dsl(cls, app_model: App, include_secret: bool = False) -> str:
- """
- Export app
- :param app_model: App instance
- :return:
- """
- app_mode = AppMode.value_of(app_model.mode)
- export_data = {
- "version": CURRENT_DSL_VERSION,
- "kind": "app",
- "app": {
- "name": app_model.name,
- "mode": app_model.mode,
- "icon": "🤖" if app_model.icon_type == "image" else app_model.icon,
- "icon_background": "#FFEAD5" if app_model.icon_type == "image" else app_model.icon_background,
- "description": app_model.description,
- "use_icon_as_answer_icon": app_model.use_icon_as_answer_icon,
- },
- }
- if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
- cls._append_workflow_export_data(
- export_data=export_data, app_model=app_model, include_secret=include_secret
- )
- else:
- cls._append_model_config_export_data(export_data, app_model)
- return yaml.dump(export_data, allow_unicode=True)
- @classmethod
- def _append_workflow_export_data(cls, *, export_data: dict, app_model: App, include_secret: bool) -> None:
- """
- Append workflow export data
- :param export_data: export data
- :param app_model: App instance
- """
- workflow_service = WorkflowService()
- workflow = workflow_service.get_draft_workflow(app_model)
- if not workflow:
- raise ValueError("Missing draft workflow configuration, please check.")
- export_data["workflow"] = workflow.to_dict(include_secret=include_secret)
- @classmethod
- def _append_model_config_export_data(cls, export_data: dict, app_model: App) -> None:
- """
- Append model config export data
- :param export_data: export data
- :param app_model: App instance
- """
- app_model_config = app_model.app_model_config
- if not app_model_config:
- raise ValueError("Missing app configuration, please check.")
- export_data["model_config"] = app_model_config.to_dict()
|