app_dsl_service.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. import logging
  2. import uuid
  3. from enum import StrEnum
  4. from typing import Optional
  5. from uuid import uuid4
  6. import yaml
  7. from packaging import version
  8. from pydantic import BaseModel, Field
  9. from sqlalchemy import select
  10. from sqlalchemy.orm import Session
  11. from core.helper import ssrf_proxy
  12. from core.model_runtime.utils.encoders import jsonable_encoder
  13. from core.plugin.entities.plugin import PluginDependency
  14. from core.workflow.nodes.enums import NodeType
  15. from core.workflow.nodes.knowledge_retrieval.entities import KnowledgeRetrievalNodeData
  16. from core.workflow.nodes.llm.entities import LLMNodeData
  17. from core.workflow.nodes.parameter_extractor.entities import ParameterExtractorNodeData
  18. from core.workflow.nodes.question_classifier.entities import QuestionClassifierNodeData
  19. from core.workflow.nodes.tool.entities import ToolNodeData
  20. from events.app_event import app_model_config_was_updated, app_was_created
  21. from extensions.ext_redis import redis_client
  22. from factories import variable_factory
  23. from models import Account, App, AppMode
  24. from models.model import AppModelConfig
  25. from models.workflow import Workflow
  26. from services.plugin.dependencies_analysis import DependenciesAnalysisService
  27. from services.workflow_service import WorkflowService
  28. logger = logging.getLogger(__name__)
  29. IMPORT_INFO_REDIS_KEY_PREFIX = "app_import_info:"
  30. IMPORT_INFO_REDIS_EXPIRY = 2 * 60 * 60 # 2 hours
  31. CURRENT_DSL_VERSION = "0.1.3"
  32. DSL_MAX_SIZE = 10 * 1024 * 1024 # 10MB
  33. class ImportMode(StrEnum):
  34. YAML_CONTENT = "yaml-content"
  35. YAML_URL = "yaml-url"
  36. class ImportStatus(StrEnum):
  37. COMPLETED = "completed"
  38. COMPLETED_WITH_WARNINGS = "completed-with-warnings"
  39. PENDING = "pending"
  40. FAILED = "failed"
  41. class Import(BaseModel):
  42. id: str
  43. status: ImportStatus
  44. app_id: Optional[str] = None
  45. current_dsl_version: str = CURRENT_DSL_VERSION
  46. imported_dsl_version: str = ""
  47. leaked_dependencies: list[PluginDependency] = Field(default_factory=list)
  48. error: str = ""
  49. def _check_version_compatibility(imported_version: str) -> ImportStatus:
  50. """Determine import status based on version comparison"""
  51. try:
  52. current_ver = version.parse(CURRENT_DSL_VERSION)
  53. imported_ver = version.parse(imported_version)
  54. except version.InvalidVersion:
  55. return ImportStatus.FAILED
  56. # Compare major version and minor version
  57. if current_ver.major != imported_ver.major or current_ver.minor != imported_ver.minor:
  58. return ImportStatus.PENDING
  59. if current_ver.micro != imported_ver.micro:
  60. return ImportStatus.COMPLETED_WITH_WARNINGS
  61. return ImportStatus.COMPLETED
  62. class PendingData(BaseModel):
  63. import_mode: str
  64. yaml_content: str
  65. name: str | None
  66. description: str | None
  67. icon_type: str | None
  68. icon: str | None
  69. icon_background: str | None
  70. app_id: str | None
  71. class AppDslService:
  72. def __init__(self, session: Session):
  73. self._session = session
  74. def import_app(
  75. self,
  76. *,
  77. account: Account,
  78. import_mode: str,
  79. yaml_content: Optional[str] = None,
  80. yaml_url: Optional[str] = None,
  81. name: Optional[str] = None,
  82. description: Optional[str] = None,
  83. icon_type: Optional[str] = None,
  84. icon: Optional[str] = None,
  85. icon_background: Optional[str] = None,
  86. app_id: Optional[str] = None,
  87. ) -> Import:
  88. """Import an app from YAML content or URL."""
  89. import_id = str(uuid.uuid4())
  90. # Validate import mode
  91. try:
  92. mode = ImportMode(import_mode)
  93. except ValueError:
  94. raise ValueError(f"Invalid import_mode: {import_mode}")
  95. # Get YAML content
  96. content = ""
  97. if mode == ImportMode.YAML_URL:
  98. if not yaml_url:
  99. return Import(
  100. id=import_id,
  101. status=ImportStatus.FAILED,
  102. error="yaml_url is required when import_mode is yaml-url",
  103. )
  104. try:
  105. response = ssrf_proxy.get(yaml_url.strip(), follow_redirects=True, timeout=(10, 10))
  106. response.raise_for_status()
  107. content = response.content
  108. if len(content) > DSL_MAX_SIZE:
  109. return Import(
  110. id=import_id,
  111. status=ImportStatus.FAILED,
  112. error="File size exceeds the limit of 10MB",
  113. )
  114. if not content:
  115. return Import(
  116. id=import_id,
  117. status=ImportStatus.FAILED,
  118. error="Empty content from url",
  119. )
  120. try:
  121. content = content.decode("utf-8")
  122. except UnicodeDecodeError as e:
  123. return Import(
  124. id=import_id,
  125. status=ImportStatus.FAILED,
  126. error=f"Error decoding content: {e}",
  127. )
  128. except Exception as e:
  129. return Import(
  130. id=import_id,
  131. status=ImportStatus.FAILED,
  132. error=f"Error fetching YAML from URL: {str(e)}",
  133. )
  134. elif mode == ImportMode.YAML_CONTENT:
  135. if not yaml_content:
  136. return Import(
  137. id=import_id,
  138. status=ImportStatus.FAILED,
  139. error="yaml_content is required when import_mode is yaml-content",
  140. )
  141. content = yaml_content
  142. # Process YAML content
  143. try:
  144. # Parse YAML to validate format
  145. data = yaml.safe_load(content)
  146. if not isinstance(data, dict):
  147. return Import(
  148. id=import_id,
  149. status=ImportStatus.FAILED,
  150. error="Invalid YAML format: content must be a mapping",
  151. )
  152. # Validate and fix DSL version
  153. if not data.get("version"):
  154. data["version"] = "0.1.0"
  155. if not data.get("kind") or data.get("kind") != "app":
  156. data["kind"] = "app"
  157. imported_version = data.get("version", "0.1.0")
  158. status = _check_version_compatibility(imported_version)
  159. # Extract app data
  160. app_data = data.get("app")
  161. if not app_data:
  162. return Import(
  163. id=import_id,
  164. status=ImportStatus.FAILED,
  165. error="Missing app data in YAML content",
  166. )
  167. # If app_id is provided, check if it exists
  168. app = None
  169. if app_id:
  170. stmt = select(App).where(App.id == app_id, App.tenant_id == account.current_tenant_id)
  171. app = self._session.scalar(stmt)
  172. if not app:
  173. return Import(
  174. id=import_id,
  175. status=ImportStatus.FAILED,
  176. error="App not found",
  177. )
  178. if app.mode not in [AppMode.WORKFLOW.value, AppMode.ADVANCED_CHAT.value]:
  179. return Import(
  180. id=import_id,
  181. status=ImportStatus.FAILED,
  182. error="Only workflow or advanced chat apps can be overwritten",
  183. )
  184. # If major version mismatch, store import info in Redis
  185. if status == ImportStatus.PENDING:
  186. pending_data = PendingData(
  187. import_mode=import_mode,
  188. yaml_content=content,
  189. name=name,
  190. description=description,
  191. icon_type=icon_type,
  192. icon=icon,
  193. icon_background=icon_background,
  194. app_id=app_id,
  195. )
  196. redis_client.setex(
  197. f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}",
  198. IMPORT_INFO_REDIS_EXPIRY,
  199. pending_data.model_dump_json(),
  200. )
  201. return Import(
  202. id=import_id,
  203. status=status,
  204. app_id=app_id,
  205. imported_dsl_version=imported_version,
  206. )
  207. try:
  208. dependencies = self.get_leaked_dependencies(account.current_tenant_id, data.get("dependencies", []))
  209. except Exception as e:
  210. return Import(
  211. id=import_id,
  212. status=ImportStatus.FAILED,
  213. error=str(e),
  214. )
  215. if len(dependencies) > 0:
  216. return Import(
  217. id=import_id,
  218. status=ImportStatus.PENDING,
  219. app_id=app_id,
  220. imported_dsl_version=imported_version,
  221. leaked_dependencies=dependencies,
  222. )
  223. # Create or update app
  224. app = self._create_or_update_app(
  225. app=app,
  226. data=data,
  227. account=account,
  228. name=name,
  229. description=description,
  230. icon_type=icon_type,
  231. icon=icon,
  232. icon_background=icon_background,
  233. )
  234. return Import(
  235. id=import_id,
  236. status=status,
  237. app_id=app.id,
  238. imported_dsl_version=imported_version,
  239. )
  240. except yaml.YAMLError as e:
  241. return Import(
  242. id=import_id,
  243. status=ImportStatus.FAILED,
  244. error=f"Invalid YAML format: {str(e)}",
  245. )
  246. except Exception as e:
  247. logger.exception("Failed to import app")
  248. return Import(
  249. id=import_id,
  250. status=ImportStatus.FAILED,
  251. error=str(e),
  252. )
  253. def confirm_import(self, *, import_id: str, account: Account) -> Import:
  254. """
  255. Confirm an import that requires confirmation
  256. """
  257. redis_key = f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}"
  258. pending_data = redis_client.get(redis_key)
  259. if not pending_data:
  260. return Import(
  261. id=import_id,
  262. status=ImportStatus.FAILED,
  263. error="Import information expired or does not exist",
  264. )
  265. try:
  266. if not isinstance(pending_data, str | bytes):
  267. return Import(
  268. id=import_id,
  269. status=ImportStatus.FAILED,
  270. error="Invalid import information",
  271. )
  272. pending_data = PendingData.model_validate_json(pending_data)
  273. data = yaml.safe_load(pending_data.yaml_content)
  274. app = None
  275. if pending_data.app_id:
  276. stmt = select(App).where(App.id == pending_data.app_id, App.tenant_id == account.current_tenant_id)
  277. app = self._session.scalar(stmt)
  278. # Create or update app
  279. app = self._create_or_update_app(
  280. app=app,
  281. data=data,
  282. account=account,
  283. name=pending_data.name,
  284. description=pending_data.description,
  285. icon_type=pending_data.icon_type,
  286. icon=pending_data.icon,
  287. icon_background=pending_data.icon_background,
  288. )
  289. # Delete import info from Redis
  290. redis_client.delete(redis_key)
  291. return Import(
  292. id=import_id,
  293. status=ImportStatus.COMPLETED,
  294. app_id=app.id,
  295. current_dsl_version=CURRENT_DSL_VERSION,
  296. imported_dsl_version=data.get("version", "0.1.0"),
  297. )
  298. except Exception as e:
  299. logger.exception("Error confirming import")
  300. return Import(
  301. id=import_id,
  302. status=ImportStatus.FAILED,
  303. error=str(e),
  304. )
  305. def _create_or_update_app(
  306. self,
  307. *,
  308. app: Optional[App],
  309. data: dict,
  310. account: Account,
  311. name: Optional[str] = None,
  312. description: Optional[str] = None,
  313. icon_type: Optional[str] = None,
  314. icon: Optional[str] = None,
  315. icon_background: Optional[str] = None,
  316. ) -> App:
  317. """Create a new app or update an existing one."""
  318. app_data = data.get("app", {})
  319. app_mode = AppMode(app_data["mode"])
  320. # Set icon type
  321. icon_type_value = icon_type or app_data.get("icon_type")
  322. if icon_type_value in ["emoji", "link"]:
  323. icon_type = icon_type_value
  324. else:
  325. icon_type = "emoji"
  326. icon = icon or str(app_data.get("icon", ""))
  327. if app:
  328. # Update existing app
  329. app.name = name or app_data.get("name", app.name)
  330. app.description = description or app_data.get("description", app.description)
  331. app.icon_type = icon_type
  332. app.icon = icon
  333. app.icon_background = icon_background or app_data.get("icon_background", app.icon_background)
  334. app.updated_by = account.id
  335. else:
  336. # Create new app
  337. app = App()
  338. app.id = str(uuid4())
  339. app.tenant_id = account.current_tenant_id
  340. app.mode = app_mode.value
  341. app.name = name or app_data.get("name", "")
  342. app.description = description or app_data.get("description", "")
  343. app.icon_type = icon_type
  344. app.icon = icon
  345. app.icon_background = icon_background or app_data.get("icon_background", "#FFFFFF")
  346. app.enable_site = True
  347. app.enable_api = True
  348. app.use_icon_as_answer_icon = app_data.get("use_icon_as_answer_icon", False)
  349. app.created_by = account.id
  350. app.updated_by = account.id
  351. self._session.add(app)
  352. self._session.commit()
  353. app_was_created.send(app, account=account)
  354. # Initialize app based on mode
  355. if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
  356. workflow_data = data.get("workflow")
  357. if not workflow_data or not isinstance(workflow_data, dict):
  358. raise ValueError("Missing workflow data for workflow/advanced chat app")
  359. environment_variables_list = workflow_data.get("environment_variables", [])
  360. environment_variables = [
  361. variable_factory.build_variable_from_mapping(obj) for obj in environment_variables_list
  362. ]
  363. conversation_variables_list = workflow_data.get("conversation_variables", [])
  364. conversation_variables = [
  365. variable_factory.build_variable_from_mapping(obj) for obj in conversation_variables_list
  366. ]
  367. workflow_service = WorkflowService()
  368. current_draft_workflow = workflow_service.get_draft_workflow(app_model=app)
  369. if current_draft_workflow:
  370. unique_hash = current_draft_workflow.unique_hash
  371. else:
  372. unique_hash = None
  373. workflow_service.sync_draft_workflow(
  374. app_model=app,
  375. graph=workflow_data.get("graph", {}),
  376. features=workflow_data.get("features", {}),
  377. unique_hash=unique_hash,
  378. account=account,
  379. environment_variables=environment_variables,
  380. conversation_variables=conversation_variables,
  381. )
  382. elif app_mode in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION}:
  383. # Initialize model config
  384. model_config = data.get("model_config")
  385. if not model_config or not isinstance(model_config, dict):
  386. raise ValueError("Missing model_config for chat/agent-chat/completion app")
  387. # Initialize or update model config
  388. if not app.app_model_config:
  389. app_model_config = AppModelConfig().from_model_config_dict(model_config)
  390. app_model_config.id = str(uuid4())
  391. app_model_config.app_id = app.id
  392. app_model_config.created_by = account.id
  393. app_model_config.updated_by = account.id
  394. app.app_model_config_id = app_model_config.id
  395. self._session.add(app_model_config)
  396. app_model_config_was_updated.send(app, app_model_config=app_model_config)
  397. else:
  398. raise ValueError("Invalid app mode")
  399. return app
  400. @classmethod
  401. def export_dsl(cls, app_model: App, include_secret: bool = False) -> str:
  402. """
  403. Export app
  404. :param app_model: App instance
  405. :return:
  406. """
  407. app_mode = AppMode.value_of(app_model.mode)
  408. export_data = {
  409. "version": CURRENT_DSL_VERSION,
  410. "kind": "app",
  411. "app": {
  412. "name": app_model.name,
  413. "mode": app_model.mode,
  414. "icon": "🤖" if app_model.icon_type == "image" else app_model.icon,
  415. "icon_background": "#FFEAD5" if app_model.icon_type == "image" else app_model.icon_background,
  416. "description": app_model.description,
  417. "use_icon_as_answer_icon": app_model.use_icon_as_answer_icon,
  418. },
  419. }
  420. if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
  421. cls._append_workflow_export_data(
  422. export_data=export_data, app_model=app_model, include_secret=include_secret
  423. )
  424. else:
  425. cls._append_model_config_export_data(export_data, app_model)
  426. return yaml.dump(export_data, allow_unicode=True)
  427. @classmethod
  428. def _append_workflow_export_data(cls, *, export_data: dict, app_model: App, include_secret: bool) -> None:
  429. """
  430. Append workflow export data
  431. :param export_data: export data
  432. :param app_model: App instance
  433. """
  434. workflow_service = WorkflowService()
  435. workflow = workflow_service.get_draft_workflow(app_model)
  436. if not workflow:
  437. raise ValueError("Missing draft workflow configuration, please check.")
  438. export_data["workflow"] = workflow.to_dict(include_secret=include_secret)
  439. dependencies = cls._extract_dependencies_from_workflow(workflow)
  440. export_data["dependencies"] = [
  441. jsonable_encoder(d.model_dump())
  442. for d in DependenciesAnalysisService.generate_dependencies(
  443. tenant_id=app_model.tenant_id, dependencies=dependencies
  444. )
  445. ]
  446. @classmethod
  447. def _append_model_config_export_data(cls, export_data: dict, app_model: App) -> None:
  448. """
  449. Append model config export data
  450. :param export_data: export data
  451. :param app_model: App instance
  452. """
  453. app_model_config = app_model.app_model_config
  454. if not app_model_config:
  455. raise ValueError("Missing app configuration, please check.")
  456. export_data["model_config"] = app_model_config.to_dict()
  457. dependencies = cls._extract_dependencies_from_model_config(app_model_config)
  458. export_data["dependencies"] = [
  459. jsonable_encoder(d.model_dump())
  460. for d in DependenciesAnalysisService.generate_dependencies(
  461. tenant_id=app_model.tenant_id, dependencies=dependencies
  462. )
  463. ]
  464. @classmethod
  465. def _extract_dependencies_from_workflow(cls, workflow: Workflow) -> list[str]:
  466. """
  467. Extract dependencies from workflow
  468. :param workflow: Workflow instance
  469. :return: dependencies list format like ["langgenius/google"]
  470. """
  471. graph = workflow.graph_dict
  472. dependencies = []
  473. for node in graph.get("nodes", []):
  474. try:
  475. typ = node.get("data", {}).get("type")
  476. match typ:
  477. case NodeType.TOOL.value:
  478. tool_entity = ToolNodeData(**node["data"])
  479. dependencies.append(
  480. DependenciesAnalysisService.analyze_tool_dependency(tool_entity.provider_id),
  481. )
  482. case NodeType.LLM.value:
  483. llm_entity = LLMNodeData(**node["data"])
  484. dependencies.append(
  485. DependenciesAnalysisService.analyze_model_provider_dependency(llm_entity.model.provider),
  486. )
  487. case NodeType.QUESTION_CLASSIFIER.value:
  488. question_classifier_entity = QuestionClassifierNodeData(**node["data"])
  489. dependencies.append(
  490. DependenciesAnalysisService.analyze_model_provider_dependency(
  491. question_classifier_entity.model.provider
  492. ),
  493. )
  494. case NodeType.PARAMETER_EXTRACTOR.value:
  495. parameter_extractor_entity = ParameterExtractorNodeData(**node["data"])
  496. dependencies.append(
  497. DependenciesAnalysisService.analyze_model_provider_dependency(
  498. parameter_extractor_entity.model.provider
  499. ),
  500. )
  501. case NodeType.KNOWLEDGE_RETRIEVAL.value:
  502. knowledge_retrieval_entity = KnowledgeRetrievalNodeData(**node["data"])
  503. if knowledge_retrieval_entity.retrieval_mode == "multiple":
  504. if knowledge_retrieval_entity.multiple_retrieval_config:
  505. if (
  506. knowledge_retrieval_entity.multiple_retrieval_config.reranking_mode
  507. == "reranking_model"
  508. ):
  509. if knowledge_retrieval_entity.multiple_retrieval_config.reranking_model:
  510. dependencies.append(
  511. DependenciesAnalysisService.analyze_model_provider_dependency(
  512. knowledge_retrieval_entity.multiple_retrieval_config.reranking_model.provider
  513. ),
  514. )
  515. elif (
  516. knowledge_retrieval_entity.multiple_retrieval_config.reranking_mode
  517. == "weighted_score"
  518. ):
  519. if knowledge_retrieval_entity.multiple_retrieval_config.weights:
  520. vector_setting = (
  521. knowledge_retrieval_entity.multiple_retrieval_config.weights.vector_setting
  522. )
  523. dependencies.append(
  524. DependenciesAnalysisService.analyze_model_provider_dependency(
  525. vector_setting.embedding_provider_name
  526. ),
  527. )
  528. elif knowledge_retrieval_entity.retrieval_mode == "single":
  529. model_config = knowledge_retrieval_entity.single_retrieval_config
  530. if model_config:
  531. dependencies.append(
  532. DependenciesAnalysisService.analyze_model_provider_dependency(
  533. model_config.model.provider
  534. ),
  535. )
  536. case _:
  537. # TODO: Handle default case or unknown node types
  538. pass
  539. except Exception as e:
  540. logger.exception("Error extracting node dependency", exc_info=e)
  541. return dependencies
  542. @classmethod
  543. def _extract_dependencies_from_model_config(cls, model_config: AppModelConfig) -> list[str]:
  544. """
  545. Extract dependencies from model config
  546. :param model_config: AppModelConfig instance
  547. :return: dependencies list format like ["langgenius/google:1.0.0@abcdef1234567890"]
  548. """
  549. dependencies = []
  550. try:
  551. # completion model
  552. model_dict = model_config.model_dict
  553. if model_dict:
  554. dependencies.append(
  555. DependenciesAnalysisService.analyze_model_provider_dependency(model_dict.get("provider", ""))
  556. )
  557. # reranking model
  558. dataset_configs = model_config.dataset_configs_dict
  559. if dataset_configs:
  560. for dataset_config in dataset_configs.get("datasets", {}).get("datasets", []):
  561. if dataset_config.get("reranking_model"):
  562. dependencies.append(
  563. DependenciesAnalysisService.analyze_model_provider_dependency(
  564. dataset_config.get("reranking_model", {})
  565. .get("reranking_provider_name", {})
  566. .get("provider")
  567. )
  568. )
  569. # tools
  570. agent_configs = model_config.agent_mode_dict
  571. if agent_configs:
  572. for agent_config in agent_configs.get("tools", []):
  573. dependencies.append(
  574. DependenciesAnalysisService.analyze_tool_dependency(agent_config.get("provider_id"))
  575. )
  576. except Exception as e:
  577. logger.exception("Error extracting model config dependency", exc_info=e)
  578. return dependencies
  579. @classmethod
  580. def get_leaked_dependencies(cls, tenant_id: str, dsl_dependencies: list[dict]) -> list[PluginDependency]:
  581. """
  582. Returns the leaked dependencies in current workspace
  583. """
  584. dependencies = [PluginDependency(**dep) for dep in dsl_dependencies]
  585. if not dependencies:
  586. return []
  587. return DependenciesAnalysisService.get_leaked_dependencies(tenant_id=tenant_id, dependencies=dependencies)