| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 | from typing import Optional, castfrom core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntityfrom core.memory.token_buffer_memory import TokenBufferMemoryfrom core.model_runtime.entities.message_entities import PromptMessagefrom core.model_runtime.entities.model_entities import ModelPropertyKeyfrom core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModelfrom core.prompt.entities.advanced_prompt_entities import MemoryConfigclass PromptTransform:    def _append_chat_histories(self, memory: TokenBufferMemory,                               memory_config: MemoryConfig,                               prompt_messages: list[PromptMessage],                               model_config: ModelConfigWithCredentialsEntity) -> list[PromptMessage]:        rest_tokens = self._calculate_rest_token(prompt_messages, model_config)        histories = self._get_history_messages_list_from_memory(memory, memory_config, rest_tokens)        prompt_messages.extend(histories)        return prompt_messages    def _calculate_rest_token(self, prompt_messages: list[PromptMessage],                              model_config: ModelConfigWithCredentialsEntity) -> int:        rest_tokens = 2000        model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)        if model_context_tokens:            model_type_instance = model_config.provider_model_bundle.model_type_instance            model_type_instance = cast(LargeLanguageModel, model_type_instance)            curr_message_tokens = model_type_instance.get_num_tokens(                model_config.model,                model_config.credentials,                prompt_messages            )            max_tokens = 0            for parameter_rule in model_config.model_schema.parameter_rules:                if (parameter_rule.name == 'max_tokens'                        or (parameter_rule.use_template and parameter_rule.use_template == 'max_tokens')):                    max_tokens = (model_config.parameters.get(parameter_rule.name)                                  or model_config.parameters.get(parameter_rule.use_template)) or 0            rest_tokens = model_context_tokens - max_tokens - curr_message_tokens            rest_tokens = max(rest_tokens, 0)        return rest_tokens    def _get_history_messages_from_memory(self, memory: TokenBufferMemory,                                          memory_config: MemoryConfig,                                          max_token_limit: int,                                          human_prefix: Optional[str] = None,                                          ai_prefix: Optional[str] = None) -> str:        """Get memory messages."""        kwargs = {            "max_token_limit": max_token_limit        }        if human_prefix:            kwargs['human_prefix'] = human_prefix        if ai_prefix:            kwargs['ai_prefix'] = ai_prefix        if memory_config.window.enabled and memory_config.window.size is not None and memory_config.window.size > 0:            kwargs['message_limit'] = memory_config.window.size        return memory.get_history_prompt_text(            **kwargs        )    def _get_history_messages_list_from_memory(self, memory: TokenBufferMemory,                                               memory_config: MemoryConfig,                                               max_token_limit: int) -> list[PromptMessage]:        """Get memory messages."""        return memory.get_history_prompt_messages(            max_token_limit=max_token_limit,            message_limit=memory_config.window.size            if (memory_config.window.enabled               and memory_config.window.size is not None               and memory_config.window.size > 0)            else 10        )
 |