app_dsl_service.py 18 KB


  1. import logging
  2. import uuid
  3. from enum import StrEnum
  4. from typing import Optional, cast
  5. from urllib.parse import urlparse
  6. from uuid import uuid4
  7. import yaml # type: ignore
  8. from packaging import version
  9. from pydantic import BaseModel
  10. from sqlalchemy import select
  11. from sqlalchemy.orm import Session
  12. from core.helper import ssrf_proxy
  13. from events.app_event import app_model_config_was_updated, app_was_created
  14. from extensions.ext_redis import redis_client
  15. from factories import variable_factory
  16. from models import Account, App, AppMode
  17. from models.model import AppModelConfig
  18. from services.workflow_service import WorkflowService
  19. logger = logging.getLogger(__name__)
  20. IMPORT_INFO_REDIS_KEY_PREFIX = "app_import_info:"
  21. IMPORT_INFO_REDIS_EXPIRY = 180 # 3 minutes
  22. CURRENT_DSL_VERSION = "0.1.5"
  23. class ImportMode(StrEnum):
  24. YAML_CONTENT = "yaml-content"
  25. YAML_URL = "yaml-url"
  26. class ImportStatus(StrEnum):
  27. COMPLETED = "completed"
  28. COMPLETED_WITH_WARNINGS = "completed-with-warnings"
  29. PENDING = "pending"
  30. FAILED = "failed"
  31. class Import(BaseModel):
  32. id: str
  33. status: ImportStatus
  34. app_id: Optional[str] = None
  35. current_dsl_version: str = CURRENT_DSL_VERSION
  36. imported_dsl_version: str = ""
  37. error: str = ""
  38. def _check_version_compatibility(imported_version: str) -> ImportStatus:
  39. """Determine import status based on version comparison"""
  40. try:
  41. current_ver = version.parse(CURRENT_DSL_VERSION)
  42. imported_ver = version.parse(imported_version)
  43. except version.InvalidVersion:
  44. return ImportStatus.FAILED
  45. # Compare major version and minor version
  46. if current_ver.major != imported_ver.major or current_ver.minor != imported_ver.minor:
  47. return ImportStatus.PENDING
  48. if current_ver.micro != imported_ver.micro:
  49. return ImportStatus.COMPLETED_WITH_WARNINGS
  50. return ImportStatus.COMPLETED
  51. class PendingData(BaseModel):
  52. import_mode: str
  53. yaml_content: str
  54. name: str | None
  55. description: str | None
  56. icon_type: str | None
  57. icon: str | None
  58. icon_background: str | None
  59. app_id: str | None
  60. class AppDslService:
  61. def __init__(self, session: Session):
  62. self._session = session
  63. def import_app(
  64. self,
  65. *,
  66. account: Account,
  67. import_mode: str,
  68. yaml_content: Optional[str] = None,
  69. yaml_url: Optional[str] = None,
  70. name: Optional[str] = None,
  71. description: Optional[str] = None,
  72. icon_type: Optional[str] = None,
  73. icon: Optional[str] = None,
  74. icon_background: Optional[str] = None,
  75. app_id: Optional[str] = None,
  76. ) -> Import:
  77. """Import an app from YAML content or URL."""
  78. import_id = str(uuid.uuid4())
  79. # Validate import mode
  80. try:
  81. mode = ImportMode(import_mode)
  82. except ValueError:
  83. raise ValueError(f"Invalid import_mode: {import_mode}")
  84. # Get YAML content
  85. content: str = ""
  86. if mode == ImportMode.YAML_URL:
  87. if not yaml_url:
  88. return Import(
  89. id=import_id,
  90. status=ImportStatus.FAILED,
  91. error="yaml_url is required when import_mode is yaml-url",
  92. )
  93. try:
  94. max_size = 10 * 1024 * 1024 # 10MB
  95. parsed_url = urlparse(yaml_url)
  96. if (
  97. parsed_url.scheme == "https"
  98. and parsed_url.netloc == "github.com"
  99. and parsed_url.path.endswith((".yml", ".yaml"))
  100. ):
  101. yaml_url = yaml_url.replace("https://github.com", "https://raw.githubusercontent.com")
  102. yaml_url = yaml_url.replace("/blob/", "/")
  103. response = ssrf_proxy.get(yaml_url.strip(), follow_redirects=True, timeout=(10, 10))
  104. response.raise_for_status()
  105. content = response.content.decode()
  106. if len(content) > max_size:
  107. return Import(
  108. id=import_id,
  109. status=ImportStatus.FAILED,
  110. error="File size exceeds the limit of 10MB",
  111. )
  112. if not content:
  113. return Import(
  114. id=import_id,
  115. status=ImportStatus.FAILED,
  116. error="Empty content from url",
  117. )
  118. try:
  119. content = cast(bytes, content).decode("utf-8")
  120. except UnicodeDecodeError as e:
  121. return Import(
  122. id=import_id,
  123. status=ImportStatus.FAILED,
  124. error=f"Error decoding content: {e}",
  125. )
  126. except Exception as e:
  127. return Import(
  128. id=import_id,
  129. status=ImportStatus.FAILED,
  130. error=f"Error fetching YAML from URL: {str(e)}",
  131. )
  132. elif mode == ImportMode.YAML_CONTENT:
  133. if not yaml_content:
  134. return Import(
  135. id=import_id,
  136. status=ImportStatus.FAILED,
  137. error="yaml_content is required when import_mode is yaml-content",
  138. )
  139. content = yaml_content
  140. # Process YAML content
  141. try:
  142. # Parse YAML to validate format
  143. data = yaml.safe_load(content)
  144. if not isinstance(data, dict):
  145. return Import(
  146. id=import_id,
  147. status=ImportStatus.FAILED,
  148. error="Invalid YAML format: content must be a mapping",
  149. )
  150. # Validate and fix DSL version
  151. if not data.get("version"):
  152. data["version"] = "0.1.0"
  153. if not data.get("kind") or data.get("kind") != "app":
  154. data["kind"] = "app"
  155. imported_version = data.get("version", "0.1.0")
  156. # check if imported_version is a float-like string
  157. if not isinstance(imported_version, str):
  158. raise ValueError(f"Invalid version type, expected str, got {type(imported_version)}")
  159. status = _check_version_compatibility(imported_version)
  160. # Extract app data
  161. app_data = data.get("app")
  162. if not app_data:
  163. return Import(
  164. id=import_id,
  165. status=ImportStatus.FAILED,
  166. error="Missing app data in YAML content",
  167. )
  168. # If app_id is provided, check if it exists
  169. app = None
  170. if app_id:
  171. stmt = select(App).where(App.id == app_id, App.tenant_id == account.current_tenant_id)
  172. app = self._session.scalar(stmt)
  173. if not app:
  174. return Import(
  175. id=import_id,
  176. status=ImportStatus.FAILED,
  177. error="App not found",
  178. )
  179. if app.mode not in [AppMode.WORKFLOW.value, AppMode.ADVANCED_CHAT.value]:
  180. return Import(
  181. id=import_id,
  182. status=ImportStatus.FAILED,
  183. error="Only workflow or advanced chat apps can be overwritten",
  184. )
  185. # If major version mismatch, store import info in Redis
  186. if status == ImportStatus.PENDING:
  187. panding_data = PendingData(
  188. import_mode=import_mode,
  189. yaml_content=content,
  190. name=name,
  191. description=description,
  192. icon_type=icon_type,
  193. icon=icon,
  194. icon_background=icon_background,
  195. app_id=app_id,
  196. )
  197. redis_client.setex(
  198. f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}",
  199. IMPORT_INFO_REDIS_EXPIRY,
  200. panding_data.model_dump_json(),
  201. )
  202. return Import(
  203. id=import_id,
  204. status=status,
  205. app_id=app_id,
  206. imported_dsl_version=imported_version,
  207. )
  208. # Create or update app
  209. app = self._create_or_update_app(
  210. app=app,
  211. data=data,
  212. account=account,
  213. name=name,
  214. description=description,
  215. icon_type=icon_type,
  216. icon=icon,
  217. icon_background=icon_background,
  218. )
  219. return Import(
  220. id=import_id,
  221. status=status,
  222. app_id=app.id,
  223. imported_dsl_version=imported_version,
  224. )
  225. except yaml.YAMLError as e:
  226. return Import(
  227. id=import_id,
  228. status=ImportStatus.FAILED,
  229. error=f"Invalid YAML format: {str(e)}",
  230. )
  231. except Exception as e:
  232. logger.exception("Failed to import app")
  233. return Import(
  234. id=import_id,
  235. status=ImportStatus.FAILED,
  236. error=str(e),
  237. )
  238. def confirm_import(self, *, import_id: str, account: Account) -> Import:
  239. """
  240. Confirm an import that requires confirmation
  241. """
  242. redis_key = f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}"
  243. pending_data = redis_client.get(redis_key)
  244. if not pending_data:
  245. return Import(
  246. id=import_id,
  247. status=ImportStatus.FAILED,
  248. error="Import information expired or does not exist",
  249. )
  250. try:
  251. if not isinstance(pending_data, str | bytes):
  252. return Import(
  253. id=import_id,
  254. status=ImportStatus.FAILED,
  255. error="Invalid import information",
  256. )
  257. pending_data = PendingData.model_validate_json(pending_data)
  258. data = yaml.safe_load(pending_data.yaml_content)
  259. app = None
  260. if pending_data.app_id:
  261. stmt = select(App).where(App.id == pending_data.app_id, App.tenant_id == account.current_tenant_id)
  262. app = self._session.scalar(stmt)
  263. # Create or update app
  264. app = self._create_or_update_app(
  265. app=app,
  266. data=data,
  267. account=account,
  268. name=pending_data.name,
  269. description=pending_data.description,
  270. icon_type=pending_data.icon_type,
  271. icon=pending_data.icon,
  272. icon_background=pending_data.icon_background,
  273. )
  274. # Delete import info from Redis
  275. redis_client.delete(redis_key)
  276. return Import(
  277. id=import_id,
  278. status=ImportStatus.COMPLETED,
  279. app_id=app.id,
  280. current_dsl_version=CURRENT_DSL_VERSION,
  281. imported_dsl_version=data.get("version", "0.1.0"),
  282. )
  283. except Exception as e:
  284. logger.exception("Error confirming import")
  285. return Import(
  286. id=import_id,
  287. status=ImportStatus.FAILED,
  288. error=str(e),
  289. )
  290. def _create_or_update_app(
  291. self,
  292. *,
  293. app: Optional[App],
  294. data: dict,
  295. account: Account,
  296. name: Optional[str] = None,
  297. description: Optional[str] = None,
  298. icon_type: Optional[str] = None,
  299. icon: Optional[str] = None,
  300. icon_background: Optional[str] = None,
  301. ) -> App:
  302. """Create a new app or update an existing one."""
  303. app_data = data.get("app", {})
  304. app_mode = app_data.get("mode")
  305. if not app_mode:
  306. raise ValueError("loss app mode")
  307. app_mode = AppMode(app_mode)
  308. # Set icon type
  309. icon_type_value = icon_type or app_data.get("icon_type")
  310. if icon_type_value in ["emoji", "link"]:
  311. icon_type = icon_type_value
  312. else:
  313. icon_type = "emoji"
  314. icon = icon or str(app_data.get("icon", ""))
  315. if app:
  316. # Update existing app
  317. app.name = name or app_data.get("name", app.name)
  318. app.description = description or app_data.get("description", app.description)
  319. app.icon_type = icon_type
  320. app.icon = icon
  321. app.icon_background = icon_background or app_data.get("icon_background", app.icon_background)
  322. app.updated_by = account.id
  323. else:
  324. if account.current_tenant_id is None:
  325. raise ValueError("Current tenant is not set")
  326. # Create new app
  327. app = App()
  328. app.id = str(uuid4())
  329. app.tenant_id = account.current_tenant_id
  330. app.mode = app_mode.value
  331. app.name = name or app_data.get("name", "")
  332. app.description = description or app_data.get("description", "")
  333. app.icon_type = icon_type
  334. app.icon = icon
  335. app.icon_background = icon_background or app_data.get("icon_background", "#FFFFFF")
  336. app.enable_site = True
  337. app.enable_api = True
  338. app.use_icon_as_answer_icon = app_data.get("use_icon_as_answer_icon", False)
  339. app.created_by = account.id
  340. app.updated_by = account.id
  341. self._session.add(app)
  342. self._session.commit()
  343. app_was_created.send(app, account=account)
  344. # Initialize app based on mode
  345. if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
  346. workflow_data = data.get("workflow")
  347. if not workflow_data or not isinstance(workflow_data, dict):
  348. raise ValueError("Missing workflow data for workflow/advanced chat app")
  349. environment_variables_list = workflow_data.get("environment_variables", [])
  350. environment_variables = [
  351. variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list
  352. ]
  353. conversation_variables_list = workflow_data.get("conversation_variables", [])
  354. conversation_variables = [
  355. variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list
  356. ]
  357. workflow_service = WorkflowService()
  358. current_draft_workflow = workflow_service.get_draft_workflow(app_model=app)
  359. if current_draft_workflow:
  360. unique_hash = current_draft_workflow.unique_hash
  361. else:
  362. unique_hash = None
  363. workflow_service.sync_draft_workflow(
  364. app_model=app,
  365. graph=workflow_data.get("graph", {}),
  366. features=workflow_data.get("features", {}),
  367. unique_hash=unique_hash,
  368. account=account,
  369. environment_variables=environment_variables,
  370. conversation_variables=conversation_variables,
  371. )
  372. elif app_mode in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION}:
  373. # Initialize model config
  374. model_config = data.get("model_config")
  375. if not model_config or not isinstance(model_config, dict):
  376. raise ValueError("Missing model_config for chat/agent-chat/completion app")
  377. # Initialize or update model config
  378. if not app.app_model_config:
  379. app_model_config = AppModelConfig().from_model_config_dict(model_config)
  380. app_model_config.id = str(uuid4())
  381. app_model_config.app_id = app.id
  382. app_model_config.created_by = account.id
  383. app_model_config.updated_by = account.id
  384. app.app_model_config_id = app_model_config.id
  385. self._session.add(app_model_config)
  386. app_model_config_was_updated.send(app, app_model_config=app_model_config)
  387. else:
  388. raise ValueError("Invalid app mode")
  389. return app
  390. @classmethod
  391. def export_dsl(cls, app_model: App, include_secret: bool = False) -> str:
  392. """
  393. Export app
  394. :param app_model: App instance
  395. :return:
  396. """
  397. app_mode = AppMode.value_of(app_model.mode)
  398. export_data = {
  399. "version": CURRENT_DSL_VERSION,
  400. "kind": "app",
  401. "app": {
  402. "name": app_model.name,
  403. "mode": app_model.mode,
  404. "icon": "🤖" if app_model.icon_type == "image" else app_model.icon,
  405. "icon_background": "#FFEAD5" if app_model.icon_type == "image" else app_model.icon_background,
  406. "description": app_model.description,
  407. "use_icon_as_answer_icon": app_model.use_icon_as_answer_icon,
  408. },
  409. }
  410. if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
  411. cls._append_workflow_export_data(
  412. export_data=export_data, app_model=app_model, include_secret=include_secret
  413. )
  414. else:
  415. cls._append_model_config_export_data(export_data, app_model)
  416. return yaml.dump(export_data, allow_unicode=True) # type: ignore
  417. @classmethod
  418. def _append_workflow_export_data(cls, *, export_data: dict, app_model: App, include_secret: bool) -> None:
  419. """
  420. Append workflow export data
  421. :param export_data: export data
  422. :param app_model: App instance
  423. """
  424. workflow_service = WorkflowService()
  425. workflow = workflow_service.get_draft_workflow(app_model)
  426. if not workflow:
  427. raise ValueError("Missing draft workflow configuration, please check.")
  428. export_data["workflow"] = workflow.to_dict(include_secret=include_secret)
  429. @classmethod
  430. def _append_model_config_export_data(cls, export_data: dict, app_model: App) -> None:
  431. """
  432. Append model config export data
  433. :param export_data: export data
  434. :param app_model: App instance
  435. """
  436. app_model_config = app_model.app_model_config
  437. if not app_model_config:
  438. raise ValueError("Missing app configuration, please check.")
  439. export_data["model_config"] = app_model_config.to_dict()