123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631 |
- import json
- from typing import Any, Optional
- from core.app.app_config.entities import (
- DatasetEntity,
- DatasetRetrieveConfigEntity,
- EasyUIBasedAppConfig,
- ExternalDataVariableEntity,
- ModelConfigEntity,
- PromptTemplateEntity,
- VariableEntity,
- )
- from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfigManager
- from core.app.apps.chat.app_config_manager import ChatAppConfigManager
- from core.app.apps.completion.app_config_manager import CompletionAppConfigManager
- from core.file.models import FileUploadConfig
- from core.helper import encrypter
- from core.model_runtime.entities.llm_entities import LLMMode
- from core.model_runtime.utils.encoders import jsonable_encoder
- from core.prompt.simple_prompt_transform import SimplePromptTransform
- from core.workflow.nodes import NodeType
- from events.app_event import app_was_created
- from extensions.ext_database import db
- from models.account import Account
- from models.api_based_extension import APIBasedExtension, APIBasedExtensionPoint
- from models.model import App, AppMode, AppModelConfig
- from models.workflow import Workflow, WorkflowType
- class WorkflowConverter:
- """
- App Convert to Workflow Mode
- """
- def convert_to_workflow(
- self, app_model: App, account: Account, name: str, icon_type: str, icon: str, icon_background: str
- ):
- """
- Convert app to workflow
- - basic mode of chatbot app
- - expert mode of chatbot app
- - completion app
- :param app_model: App instance
- :param account: Account
- :param name: new app name
- :param icon: new app icon
- :param icon_type: new app icon type
- :param icon_background: new app icon background
- :return: new App instance
- """
- # convert app model config
- if not app_model.app_model_config:
- raise ValueError("App model config is required")
- workflow = self.convert_app_model_config_to_workflow(
- app_model=app_model, app_model_config=app_model.app_model_config, account_id=account.id
- )
- # create new app
- new_app = App()
- new_app.tenant_id = app_model.tenant_id
- new_app.name = name or app_model.name + "(workflow)"
- new_app.mode = AppMode.ADVANCED_CHAT.value if app_model.mode == AppMode.CHAT.value else AppMode.WORKFLOW.value
- new_app.icon_type = icon_type or app_model.icon_type
- new_app.icon = icon or app_model.icon
- new_app.icon_background = icon_background or app_model.icon_background
- new_app.enable_site = app_model.enable_site
- new_app.enable_api = app_model.enable_api
- new_app.api_rpm = app_model.api_rpm
- new_app.api_rph = app_model.api_rph
- new_app.is_demo = False
- new_app.is_public = app_model.is_public
- new_app.created_by = account.id
- new_app.updated_by = account.id
- db.session.add(new_app)
- db.session.flush()
- db.session.commit()
- workflow.app_id = new_app.id
- db.session.commit()
- app_was_created.send(new_app, account=account)
- return new_app
- def convert_app_model_config_to_workflow(self, app_model: App, app_model_config: AppModelConfig, account_id: str):
- """
- Convert app model config to workflow mode
- :param app_model: App instance
- :param app_model_config: AppModelConfig instance
- :param account_id: Account ID
- """
- # get new app mode
- new_app_mode = self._get_new_app_mode(app_model)
- # convert app model config
- app_config = self._convert_to_app_config(app_model=app_model, app_model_config=app_model_config)
- # init workflow graph
- graph: dict[str, Any] = {"nodes": [], "edges": []}
- # Convert list:
- # - variables -> start
- # - model_config -> llm
- # - prompt_template -> llm
- # - file_upload -> llm
- # - external_data_variables -> http-request
- # - dataset -> knowledge-retrieval
- # - show_retrieve_source -> knowledge-retrieval
- # convert to start node
- start_node = self._convert_to_start_node(variables=app_config.variables)
- graph["nodes"].append(start_node)
- # convert to http request node
- external_data_variable_node_mapping: dict[str, str] = {}
- if app_config.external_data_variables:
- http_request_nodes, external_data_variable_node_mapping = self._convert_to_http_request_node(
- app_model=app_model,
- variables=app_config.variables,
- external_data_variables=app_config.external_data_variables,
- )
- for http_request_node in http_request_nodes:
- graph = self._append_node(graph, http_request_node)
- # convert to knowledge retrieval node
- if app_config.dataset:
- knowledge_retrieval_node = self._convert_to_knowledge_retrieval_node(
- new_app_mode=new_app_mode, dataset_config=app_config.dataset, model_config=app_config.model
- )
- if knowledge_retrieval_node:
- graph = self._append_node(graph, knowledge_retrieval_node)
- # convert to llm node
- llm_node = self._convert_to_llm_node(
- original_app_mode=AppMode.value_of(app_model.mode),
- new_app_mode=new_app_mode,
- graph=graph,
- model_config=app_config.model,
- prompt_template=app_config.prompt_template,
- file_upload=app_config.additional_features.file_upload,
- external_data_variable_node_mapping=external_data_variable_node_mapping,
- )
- graph = self._append_node(graph, llm_node)
- if new_app_mode == AppMode.WORKFLOW:
- # convert to end node by app mode
- end_node = self._convert_to_end_node()
- graph = self._append_node(graph, end_node)
- else:
- answer_node = self._convert_to_answer_node()
- graph = self._append_node(graph, answer_node)
- app_model_config_dict = app_config.app_model_config_dict
- # features
- if new_app_mode == AppMode.ADVANCED_CHAT:
- features = {
- "opening_statement": app_model_config_dict.get("opening_statement"),
- "suggested_questions": app_model_config_dict.get("suggested_questions"),
- "suggested_questions_after_answer": app_model_config_dict.get("suggested_questions_after_answer"),
- "speech_to_text": app_model_config_dict.get("speech_to_text"),
- "text_to_speech": app_model_config_dict.get("text_to_speech"),
- "file_upload": app_model_config_dict.get("file_upload"),
- "sensitive_word_avoidance": app_model_config_dict.get("sensitive_word_avoidance"),
- "retriever_resource": app_model_config_dict.get("retriever_resource"),
- }
- else:
- features = {
- "text_to_speech": app_model_config_dict.get("text_to_speech"),
- "file_upload": app_model_config_dict.get("file_upload"),
- "sensitive_word_avoidance": app_model_config_dict.get("sensitive_word_avoidance"),
- }
- # create workflow record
- workflow = Workflow(
- tenant_id=app_model.tenant_id,
- app_id=app_model.id,
- type=WorkflowType.from_app_mode(new_app_mode).value,
- version="draft",
- graph=json.dumps(graph),
- features=json.dumps(features),
- created_by=account_id,
- environment_variables=[],
- conversation_variables=[],
- )
- db.session.add(workflow)
- db.session.commit()
- return workflow
- def _convert_to_app_config(self, app_model: App, app_model_config: AppModelConfig) -> EasyUIBasedAppConfig:
- app_mode_enum = AppMode.value_of(app_model.mode)
- app_config: EasyUIBasedAppConfig
- if app_mode_enum == AppMode.AGENT_CHAT or app_model.is_agent:
- app_model.mode = AppMode.AGENT_CHAT.value
- app_config = AgentChatAppConfigManager.get_app_config(
- app_model=app_model, app_model_config=app_model_config
- )
- elif app_mode_enum == AppMode.CHAT:
- app_config = ChatAppConfigManager.get_app_config(app_model=app_model, app_model_config=app_model_config)
- elif app_mode_enum == AppMode.COMPLETION:
- app_config = CompletionAppConfigManager.get_app_config(
- app_model=app_model, app_model_config=app_model_config
- )
- else:
- raise ValueError("Invalid app mode")
- return app_config
- def _convert_to_start_node(self, variables: list[VariableEntity]) -> dict:
- """
- Convert to Start Node
- :param variables: list of variables
- :return:
- """
- return {
- "id": "start",
- "position": None,
- "data": {
- "title": "START",
- "type": NodeType.START.value,
- "variables": [jsonable_encoder(v) for v in variables],
- },
- }
- def _convert_to_http_request_node(
- self, app_model: App, variables: list[VariableEntity], external_data_variables: list[ExternalDataVariableEntity]
- ) -> tuple[list[dict], dict[str, str]]:
- """
- Convert API Based Extension to HTTP Request Node
- :param app_model: App instance
- :param variables: list of variables
- :param external_data_variables: list of external data variables
- :return:
- """
- index = 1
- nodes = []
- external_data_variable_node_mapping = {}
- tenant_id = app_model.tenant_id
- for external_data_variable in external_data_variables:
- tool_type = external_data_variable.type
- if tool_type != "api":
- continue
- tool_variable = external_data_variable.variable
- tool_config = external_data_variable.config
- # get params from config
- api_based_extension_id = tool_config.get("api_based_extension_id")
- if not api_based_extension_id:
- continue
- # get api_based_extension
- api_based_extension = self._get_api_based_extension(
- tenant_id=tenant_id, api_based_extension_id=api_based_extension_id
- )
- # decrypt api_key
- api_key = encrypter.decrypt_token(tenant_id=tenant_id, token=api_based_extension.api_key)
- inputs = {}
- for v in variables:
- inputs[v.variable] = "{{#start." + v.variable + "#}}"
- request_body = {
- "point": APIBasedExtensionPoint.APP_EXTERNAL_DATA_TOOL_QUERY.value,
- "params": {
- "app_id": app_model.id,
- "tool_variable": tool_variable,
- "inputs": inputs,
- "query": "{{#sys.query#}}" if app_model.mode == AppMode.CHAT.value else "",
- },
- }
- request_body_json = json.dumps(request_body)
- request_body_json = request_body_json.replace(r"\{\{", "{{").replace(r"\}\}", "}}")
- http_request_node = {
- "id": f"http_request_{index}",
- "position": None,
- "data": {
- "title": f"HTTP REQUEST {api_based_extension.name}",
- "type": NodeType.HTTP_REQUEST.value,
- "method": "post",
- "url": api_based_extension.api_endpoint,
- "authorization": {"type": "api-key", "config": {"type": "bearer", "api_key": api_key}},
- "headers": "",
- "params": "",
- "body": {"type": "json", "data": request_body_json},
- },
- }
- nodes.append(http_request_node)
- # append code node for response body parsing
- code_node: dict[str, Any] = {
- "id": f"code_{index}",
- "position": None,
- "data": {
- "title": f"Parse {api_based_extension.name} Response",
- "type": NodeType.CODE.value,
- "variables": [{"variable": "response_json", "value_selector": [http_request_node["id"], "body"]}],
- "code_language": "python3",
- "code": "import json\n\ndef main(response_json: str) -> str:\n response_body = json.loads("
- 'response_json)\n return {\n "result": response_body["result"]\n }',
- "outputs": {"result": {"type": "string"}},
- },
- }
- nodes.append(code_node)
- external_data_variable_node_mapping[external_data_variable.variable] = code_node["id"]
- index += 1
- return nodes, external_data_variable_node_mapping
- def _convert_to_knowledge_retrieval_node(
- self, new_app_mode: AppMode, dataset_config: DatasetEntity, model_config: ModelConfigEntity
- ) -> Optional[dict]:
- """
- Convert datasets to Knowledge Retrieval Node
- :param new_app_mode: new app mode
- :param dataset_config: dataset
- :param model_config: model config
- :return:
- """
- retrieve_config = dataset_config.retrieve_config
- if new_app_mode == AppMode.ADVANCED_CHAT:
- query_variable_selector = ["sys", "query"]
- elif retrieve_config.query_variable:
- # fetch query variable
- query_variable_selector = ["start", retrieve_config.query_variable]
- else:
- return None
- return {
- "id": "knowledge_retrieval",
- "position": None,
- "data": {
- "title": "KNOWLEDGE RETRIEVAL",
- "type": NodeType.KNOWLEDGE_RETRIEVAL.value,
- "query_variable_selector": query_variable_selector,
- "dataset_ids": dataset_config.dataset_ids,
- "retrieval_mode": retrieve_config.retrieve_strategy.value,
- "single_retrieval_config": {
- "model": {
- "provider": model_config.provider,
- "name": model_config.model,
- "mode": model_config.mode,
- "completion_params": {
- **model_config.parameters,
- "stop": model_config.stop,
- },
- }
- }
- if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE
- else None,
- "multiple_retrieval_config": {
- "top_k": retrieve_config.top_k,
- "score_threshold": retrieve_config.score_threshold,
- "reranking_model": retrieve_config.reranking_model,
- }
- if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE
- else None,
- },
- }
- def _convert_to_llm_node(
- self,
- original_app_mode: AppMode,
- new_app_mode: AppMode,
- graph: dict,
- model_config: ModelConfigEntity,
- prompt_template: PromptTemplateEntity,
- file_upload: Optional[FileUploadConfig] = None,
- external_data_variable_node_mapping: dict[str, str] | None = None,
- ) -> dict:
- """
- Convert to LLM Node
- :param original_app_mode: original app mode
- :param new_app_mode: new app mode
- :param graph: graph
- :param model_config: model config
- :param prompt_template: prompt template
- :param file_upload: file upload config (optional)
- :param external_data_variable_node_mapping: external data variable node mapping
- """
- # fetch start and knowledge retrieval node
- start_node = next(filter(lambda n: n["data"]["type"] == NodeType.START.value, graph["nodes"]))
- knowledge_retrieval_node = next(
- filter(lambda n: n["data"]["type"] == NodeType.KNOWLEDGE_RETRIEVAL.value, graph["nodes"]), None
- )
- role_prefix = None
- prompts: Any = None
- # Chat Model
- if model_config.mode == LLMMode.CHAT.value:
- if prompt_template.prompt_type == PromptTemplateEntity.PromptType.SIMPLE:
- if not prompt_template.simple_prompt_template:
- raise ValueError("Simple prompt template is required")
- # get prompt template
- prompt_transform = SimplePromptTransform()
- prompt_template_config = prompt_transform.get_prompt_template(
- app_mode=original_app_mode,
- provider=model_config.provider,
- model=model_config.model,
- pre_prompt=prompt_template.simple_prompt_template,
- has_context=knowledge_retrieval_node is not None,
- query_in_prompt=False,
- )
- template = prompt_template_config["prompt_template"].template
- if not template:
- prompts = []
- else:
- template = self._replace_template_variables(
- template, start_node["data"]["variables"], external_data_variable_node_mapping
- )
- prompts = [{"role": "user", "text": template}]
- else:
- advanced_chat_prompt_template = prompt_template.advanced_chat_prompt_template
- prompts = []
- if advanced_chat_prompt_template:
- for m in advanced_chat_prompt_template.messages:
- text = m.text
- text = self._replace_template_variables(
- text, start_node["data"]["variables"], external_data_variable_node_mapping
- )
- prompts.append({"role": m.role.value, "text": text})
- # Completion Model
- else:
- if prompt_template.prompt_type == PromptTemplateEntity.PromptType.SIMPLE:
- if not prompt_template.simple_prompt_template:
- raise ValueError("Simple prompt template is required")
- # get prompt template
- prompt_transform = SimplePromptTransform()
- prompt_template_config = prompt_transform.get_prompt_template(
- app_mode=original_app_mode,
- provider=model_config.provider,
- model=model_config.model,
- pre_prompt=prompt_template.simple_prompt_template,
- has_context=knowledge_retrieval_node is not None,
- query_in_prompt=False,
- )
- template = prompt_template_config["prompt_template"].template
- template = self._replace_template_variables(
- template=template,
- variables=start_node["data"]["variables"],
- external_data_variable_node_mapping=external_data_variable_node_mapping,
- )
- prompts = {"text": template}
- prompt_rules = prompt_template_config["prompt_rules"]
- role_prefix = {
- "user": prompt_rules.get("human_prefix", "Human"),
- "assistant": prompt_rules.get("assistant_prefix", "Assistant"),
- }
- else:
- advanced_completion_prompt_template = prompt_template.advanced_completion_prompt_template
- if advanced_completion_prompt_template:
- text = advanced_completion_prompt_template.prompt
- text = self._replace_template_variables(
- template=text,
- variables=start_node["data"]["variables"],
- external_data_variable_node_mapping=external_data_variable_node_mapping,
- )
- else:
- text = ""
- text = text.replace("{{#query#}}", "{{#sys.query#}}")
- prompts = {
- "text": text,
- }
- if advanced_completion_prompt_template and advanced_completion_prompt_template.role_prefix:
- role_prefix = {
- "user": advanced_completion_prompt_template.role_prefix.user,
- "assistant": advanced_completion_prompt_template.role_prefix.assistant,
- }
- memory = None
- if new_app_mode == AppMode.ADVANCED_CHAT:
- memory = {"role_prefix": role_prefix, "window": {"enabled": False}}
- completion_params = model_config.parameters
- completion_params.update({"stop": model_config.stop})
- return {
- "id": "llm",
- "position": None,
- "data": {
- "title": "LLM",
- "type": NodeType.LLM.value,
- "model": {
- "provider": model_config.provider,
- "name": model_config.model,
- "mode": model_config.mode,
- "completion_params": completion_params,
- },
- "prompt_template": prompts,
- "memory": memory,
- "context": {
- "enabled": knowledge_retrieval_node is not None,
- "variable_selector": ["knowledge_retrieval", "result"]
- if knowledge_retrieval_node is not None
- else None,
- },
- "vision": {
- "enabled": file_upload is not None,
- "variable_selector": ["sys", "files"] if file_upload is not None else None,
- "configs": {"detail": file_upload.image_config.detail}
- if file_upload is not None and file_upload.image_config is not None
- else None,
- },
- },
- }
- def _replace_template_variables(
- self, template: str, variables: list[dict], external_data_variable_node_mapping: dict[str, str] | None = None
- ) -> str:
- """
- Replace Template Variables
- :param template: template
- :param variables: list of variables
- :param external_data_variable_node_mapping: external data variable node mapping
- :return:
- """
- for v in variables:
- template = template.replace("{{" + v["variable"] + "}}", "{{#start." + v["variable"] + "#}}")
- if external_data_variable_node_mapping:
- for variable, code_node_id in external_data_variable_node_mapping.items():
- template = template.replace("{{" + variable + "}}", "{{#" + code_node_id + ".result#}}")
- return template
- def _convert_to_end_node(self) -> dict:
- """
- Convert to End Node
- :return:
- """
- # for original completion app
- return {
- "id": "end",
- "position": None,
- "data": {
- "title": "END",
- "type": NodeType.END.value,
- "outputs": [{"variable": "result", "value_selector": ["llm", "text"]}],
- },
- }
- def _convert_to_answer_node(self) -> dict:
- """
- Convert to Answer Node
- :return:
- """
- # for original chat app
- return {
- "id": "answer",
- "position": None,
- "data": {"title": "ANSWER", "type": NodeType.ANSWER.value, "answer": "{{#llm.text#}}"},
- }
- def _create_edge(self, source: str, target: str) -> dict:
- """
- Create Edge
- :param source: source node id
- :param target: target node id
- :return:
- """
- return {"id": f"{source}-{target}", "source": source, "target": target}
- def _append_node(self, graph: dict, node: dict) -> dict:
- """
- Append Node to Graph
- :param graph: Graph, include: nodes, edges
- :param node: Node to append
- :return:
- """
- previous_node = graph["nodes"][-1]
- graph["nodes"].append(node)
- graph["edges"].append(self._create_edge(previous_node["id"], node["id"]))
- return graph
- def _get_new_app_mode(self, app_model: App) -> AppMode:
- """
- Get new app mode
- :param app_model: App instance
- :return: AppMode
- """
- if app_model.mode == AppMode.COMPLETION.value:
- return AppMode.WORKFLOW
- else:
- return AppMode.ADVANCED_CHAT
- def _get_api_based_extension(self, tenant_id: str, api_based_extension_id: str):
- """
- Get API Based Extension
- :param tenant_id: tenant id
- :param api_based_extension_id: api based extension id
- :return:
- """
- api_based_extension = (
- db.session.query(APIBasedExtension)
- .filter(APIBasedExtension.tenant_id == tenant_id, APIBasedExtension.id == api_based_extension_id)
- .first()
- )
- if not api_based_extension:
- raise ValueError(f"API Based Extension not found, id: {api_based_extension_id}")
- return api_based_extension
|