| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 | from typing import Any, Optionalfrom core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntityfrom core.memory.token_buffer_memory import TokenBufferMemoryfrom core.model_manager import ModelInstancefrom core.model_runtime.entities.message_entities import PromptMessagefrom core.model_runtime.entities.model_entities import ModelPropertyKeyfrom core.prompt.entities.advanced_prompt_entities import MemoryConfigclass PromptTransform:    def _append_chat_histories(        self,        memory: TokenBufferMemory,        memory_config: MemoryConfig,        prompt_messages: list[PromptMessage],        model_config: ModelConfigWithCredentialsEntity,    ) -> list[PromptMessage]:        rest_tokens = self._calculate_rest_token(prompt_messages, model_config)        histories = self._get_history_messages_list_from_memory(memory, memory_config, rest_tokens)        prompt_messages.extend(histories)        return prompt_messages    def _calculate_rest_token(        self, prompt_messages: list[PromptMessage], model_config: ModelConfigWithCredentialsEntity    ) -> int:        rest_tokens = 2000        model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)        if model_context_tokens:            model_instance = ModelInstance(                provider_model_bundle=model_config.provider_model_bundle, model=model_config.model            )            curr_message_tokens = model_instance.get_llm_num_tokens(prompt_messages)            max_tokens = 0            for parameter_rule in model_config.model_schema.parameter_rules:                if parameter_rule.name == "max_tokens" or (                    parameter_rule.use_template and parameter_rule.use_template == "max_tokens"                ):                    max_tokens = (                        model_config.parameters.get(parameter_rule.name)                        or model_config.parameters.get(parameter_rule.use_template or "")                    ) or 0            rest_tokens = model_context_tokens - max_tokens - curr_message_tokens            rest_tokens = max(rest_tokens, 0)        return rest_tokens    def _get_history_messages_from_memory(        self,        memory: TokenBufferMemory,        memory_config: MemoryConfig,        max_token_limit: int,        human_prefix: Optional[str] = None,        ai_prefix: Optional[str] = None,    ) -> str:        """Get memory messages."""        kwargs: dict[str, Any] = {"max_token_limit": max_token_limit}        if human_prefix:            kwargs["human_prefix"] = human_prefix        if ai_prefix:            kwargs["ai_prefix"] = ai_prefix        if memory_config.window.enabled and memory_config.window.size is not None and memory_config.window.size > 0:            kwargs["message_limit"] = memory_config.window.size        return memory.get_history_prompt_text(**kwargs)    def _get_history_messages_list_from_memory(        self, memory: TokenBufferMemory, memory_config: MemoryConfig, max_token_limit: int    ) -> list[PromptMessage]:        """Get memory messages."""        return list(            memory.get_history_prompt_messages(                max_token_limit=max_token_limit,                message_limit=memory_config.window.size                if (                    memory_config.window.enabled                    and memory_config.window.size is not None                    and memory_config.window.size > 0                )                else None,            )        )
 |