| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 | 
							- from typing import Any, Optional
 
- from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
 
- from core.memory.token_buffer_memory import TokenBufferMemory
 
- from core.model_manager import ModelInstance
 
- from core.model_runtime.entities.message_entities import PromptMessage
 
- from core.model_runtime.entities.model_entities import ModelPropertyKey
 
- from core.prompt.entities.advanced_prompt_entities import MemoryConfig
 
- class PromptTransform:
 
-     def _append_chat_histories(
 
-         self,
 
-         memory: TokenBufferMemory,
 
-         memory_config: MemoryConfig,
 
-         prompt_messages: list[PromptMessage],
 
-         model_config: ModelConfigWithCredentialsEntity,
 
-     ) -> list[PromptMessage]:
 
-         rest_tokens = self._calculate_rest_token(prompt_messages, model_config)
 
-         histories = self._get_history_messages_list_from_memory(memory, memory_config, rest_tokens)
 
-         prompt_messages.extend(histories)
 
-         return prompt_messages
 
-     def _calculate_rest_token(
 
-         self, prompt_messages: list[PromptMessage], model_config: ModelConfigWithCredentialsEntity
 
-     ) -> int:
 
-         rest_tokens = 2000
 
-         model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)
 
-         if model_context_tokens:
 
-             model_instance = ModelInstance(
 
-                 provider_model_bundle=model_config.provider_model_bundle, model=model_config.model
 
-             )
 
-             curr_message_tokens = model_instance.get_llm_num_tokens(prompt_messages)
 
-             max_tokens = 0
 
-             for parameter_rule in model_config.model_schema.parameter_rules:
 
-                 if parameter_rule.name == "max_tokens" or (
 
-                     parameter_rule.use_template and parameter_rule.use_template == "max_tokens"
 
-                 ):
 
-                     max_tokens = (
 
-                         model_config.parameters.get(parameter_rule.name)
 
-                         or model_config.parameters.get(parameter_rule.use_template or "")
 
-                     ) or 0
 
-             rest_tokens = model_context_tokens - max_tokens - curr_message_tokens
 
-             rest_tokens = max(rest_tokens, 0)
 
-         return rest_tokens
 
-     def _get_history_messages_from_memory(
 
-         self,
 
-         memory: TokenBufferMemory,
 
-         memory_config: MemoryConfig,
 
-         max_token_limit: int,
 
-         human_prefix: Optional[str] = None,
 
-         ai_prefix: Optional[str] = None,
 
-     ) -> str:
 
-         """Get memory messages."""
 
-         kwargs: dict[str, Any] = {"max_token_limit": max_token_limit}
 
-         if human_prefix:
 
-             kwargs["human_prefix"] = human_prefix
 
-         if ai_prefix:
 
-             kwargs["ai_prefix"] = ai_prefix
 
-         if memory_config.window.enabled and memory_config.window.size is not None and memory_config.window.size > 0:
 
-             kwargs["message_limit"] = memory_config.window.size
 
-         return memory.get_history_prompt_text(**kwargs)
 
-     def _get_history_messages_list_from_memory(
 
-         self, memory: TokenBufferMemory, memory_config: MemoryConfig, max_token_limit: int
 
-     ) -> list[PromptMessage]:
 
-         """Get memory messages."""
 
-         return list(
 
-             memory.get_history_prompt_messages(
 
-                 max_token_limit=max_token_limit,
 
-                 message_limit=memory_config.window.size
 
-                 if (
 
-                     memory_config.window.enabled
 
-                     and memory_config.window.size is not None
 
-                     and memory_config.window.size > 0
 
-                 )
 
-                 else None,
 
-             )
 
-         )
 
 
  |