123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- from core.model_runtime.entities.llm_entities import LLMResult
- from core.model_runtime.entities.message_entities import PromptMessage, SystemPromptMessage, UserPromptMessage
- from core.tools.__base.tool import Tool
- from core.tools.__base.tool_runtime import ToolRuntime
- from core.tools.entities.tool_entities import ToolProviderType
- from core.tools.utils.model_invocation_utils import ModelInvocationUtils
- _SUMMARY_PROMPT = """You are a professional language researcher, you are interested in the language
- and you can quickly aimed at the main point of an webpage and reproduce it in your own words but
- retain the original meaning and keep the key points.
- however, the text you got is too long, what you got is possible a part of the text.
- Please summarize the text you got.
- """
- class BuiltinTool(Tool):
- """
- Builtin tool
- :param meta: the meta data of a tool call processing
- """
- provider: str
- def __init__(self, provider: str, **kwargs):
- super().__init__(**kwargs)
- self.provider = provider
- def fork_tool_runtime(self, runtime: ToolRuntime) -> "BuiltinTool":
- """
- fork a new tool with meta data
- :param meta: the meta data of a tool call processing, tenant_id is required
- :return: the new tool
- """
- return self.__class__(
- entity=self.entity.model_copy(),
- runtime=runtime,
- provider=self.provider,
- )
- def invoke_model(self, user_id: str, prompt_messages: list[PromptMessage], stop: list[str]) -> LLMResult:
- """
- invoke model
- :param model_config: the model config
- :param prompt_messages: the prompt messages
- :param stop: the stop words
- :return: the model result
- """
- # invoke model
- return ModelInvocationUtils.invoke(
- user_id=user_id,
- tenant_id=self.runtime.tenant_id or "",
- tool_type="builtin",
- tool_name=self.entity.identity.name,
- prompt_messages=prompt_messages,
- )
- def tool_provider_type(self) -> ToolProviderType:
- return ToolProviderType.BUILT_IN
- def get_max_tokens(self) -> int:
- """
- get max tokens
- :param model_config: the model config
- :return: the max tokens
- """
- if self.runtime is None:
- raise ValueError("runtime is required")
- return ModelInvocationUtils.get_max_llm_context_tokens(
- tenant_id=self.runtime.tenant_id or "",
- )
- def get_prompt_tokens(self, prompt_messages: list[PromptMessage]) -> int:
- """
- get prompt tokens
- :param prompt_messages: the prompt messages
- :return: the tokens
- """
- if self.runtime is None:
- raise ValueError("runtime is required")
- return ModelInvocationUtils.calculate_tokens(
- tenant_id=self.runtime.tenant_id or "", prompt_messages=prompt_messages
- )
- def summary(self, user_id: str, content: str) -> str:
- max_tokens = self.get_max_tokens()
- if self.get_prompt_tokens(prompt_messages=[UserPromptMessage(content=content)]) < max_tokens * 0.6:
- return content
- def get_prompt_tokens(content: str) -> int:
- return self.get_prompt_tokens(
- prompt_messages=[SystemPromptMessage(content=_SUMMARY_PROMPT), UserPromptMessage(content=content)]
- )
- def summarize(content: str) -> str:
- summary = self.invoke_model(
- user_id=user_id,
- prompt_messages=[SystemPromptMessage(content=_SUMMARY_PROMPT), UserPromptMessage(content=content)],
- stop=[],
- )
- assert isinstance(summary.message.content, str)
- return summary.message.content
- lines = content.split("\n")
- new_lines = []
- # split long line into multiple lines
- for i in range(len(lines)):
- line = lines[i]
- if not line.strip():
- continue
- if len(line) < max_tokens * 0.5:
- new_lines.append(line)
- elif get_prompt_tokens(line) > max_tokens * 0.7:
- while get_prompt_tokens(line) > max_tokens * 0.7:
- new_lines.append(line[: int(max_tokens * 0.5)])
- line = line[int(max_tokens * 0.5) :]
- new_lines.append(line)
- else:
- new_lines.append(line)
- # merge lines into messages with max tokens
- messages: list[str] = []
- for j in new_lines:
- if len(messages) == 0:
- messages.append(j)
- else:
- if len(messages[-1]) + len(j) < max_tokens * 0.5:
- messages[-1] += j
- if get_prompt_tokens(messages[-1] + j) > max_tokens * 0.7:
- messages.append(j)
- else:
- messages[-1] += j
- summaries = []
- for i in range(len(messages)):
- message = messages[i]
- summary = summarize(message)
- summaries.append(summary)
- result = "\n".join(summaries)
- if self.get_prompt_tokens(prompt_messages=[UserPromptMessage(content=result)]) > max_tokens * 0.7:
- return self.summary(user_id=user_id, content=result)
- return result
|