|
from typing import Optional |
|
|
|
from core.model_runtime.entities.llm_entities import LLMResult |
|
from core.model_runtime.entities.message_entities import PromptMessage, SystemPromptMessage, UserPromptMessage |
|
from core.tools.entities.tool_entities import ToolProviderType |
|
from core.tools.tool.tool import Tool |
|
from core.tools.utils.model_invocation_utils import ModelInvocationUtils |
|
from core.tools.utils.web_reader_tool import get_url |
|
|
|
_SUMMARY_PROMPT = """You are a professional language researcher, you are interested in the language |
|
and you can quickly aimed at the main point of an webpage and reproduce it in your own words but |
|
retain the original meaning and keep the key points. |
|
however, the text you got is too long, what you got is possible a part of the text. |
|
Please summarize the text you got. |
|
""" |
|
|
|
|
|
class BuiltinTool(Tool): |
|
""" |
|
Builtin tool |
|
|
|
:param meta: the meta data of a tool call processing |
|
""" |
|
|
|
def invoke_model(self, user_id: str, prompt_messages: list[PromptMessage], stop: list[str]) -> LLMResult: |
|
""" |
|
invoke model |
|
|
|
:param model_config: the model config |
|
:param prompt_messages: the prompt messages |
|
:param stop: the stop words |
|
:return: the model result |
|
""" |
|
|
|
return ModelInvocationUtils.invoke( |
|
user_id=user_id, |
|
tenant_id=self.runtime.tenant_id, |
|
tool_type="builtin", |
|
tool_name=self.identity.name, |
|
prompt_messages=prompt_messages, |
|
) |
|
|
|
def tool_provider_type(self) -> ToolProviderType: |
|
return ToolProviderType.BUILT_IN |
|
|
|
def get_max_tokens(self) -> int: |
|
""" |
|
get max tokens |
|
|
|
:param model_config: the model config |
|
:return: the max tokens |
|
""" |
|
return ModelInvocationUtils.get_max_llm_context_tokens( |
|
tenant_id=self.runtime.tenant_id, |
|
) |
|
|
|
def get_prompt_tokens(self, prompt_messages: list[PromptMessage]) -> int: |
|
""" |
|
get prompt tokens |
|
|
|
:param prompt_messages: the prompt messages |
|
:return: the tokens |
|
""" |
|
return ModelInvocationUtils.calculate_tokens(tenant_id=self.runtime.tenant_id, prompt_messages=prompt_messages) |
|
|
|
def summary(self, user_id: str, content: str) -> str: |
|
max_tokens = self.get_max_tokens() |
|
|
|
if self.get_prompt_tokens(prompt_messages=[UserPromptMessage(content=content)]) < max_tokens * 0.6: |
|
return content |
|
|
|
def get_prompt_tokens(content: str) -> int: |
|
return self.get_prompt_tokens( |
|
prompt_messages=[SystemPromptMessage(content=_SUMMARY_PROMPT), UserPromptMessage(content=content)] |
|
) |
|
|
|
def summarize(content: str) -> str: |
|
summary = self.invoke_model( |
|
user_id=user_id, |
|
prompt_messages=[SystemPromptMessage(content=_SUMMARY_PROMPT), UserPromptMessage(content=content)], |
|
stop=[], |
|
) |
|
|
|
return summary.message.content |
|
|
|
lines = content.split("\n") |
|
new_lines = [] |
|
|
|
for i in range(len(lines)): |
|
line = lines[i] |
|
if not line.strip(): |
|
continue |
|
if len(line) < max_tokens * 0.5: |
|
new_lines.append(line) |
|
elif get_prompt_tokens(line) > max_tokens * 0.7: |
|
while get_prompt_tokens(line) > max_tokens * 0.7: |
|
new_lines.append(line[: int(max_tokens * 0.5)]) |
|
line = line[int(max_tokens * 0.5) :] |
|
new_lines.append(line) |
|
else: |
|
new_lines.append(line) |
|
|
|
|
|
messages: list[str] = [] |
|
for i in new_lines: |
|
if len(messages) == 0: |
|
messages.append(i) |
|
else: |
|
if len(messages[-1]) + len(i) < max_tokens * 0.5: |
|
messages[-1] += i |
|
if get_prompt_tokens(messages[-1] + i) > max_tokens * 0.7: |
|
messages.append(i) |
|
else: |
|
messages[-1] += i |
|
|
|
summaries = [] |
|
for i in range(len(messages)): |
|
message = messages[i] |
|
summary = summarize(message) |
|
summaries.append(summary) |
|
|
|
result = "\n".join(summaries) |
|
|
|
if self.get_prompt_tokens(prompt_messages=[UserPromptMessage(content=result)]) > max_tokens * 0.7: |
|
return self.summary(user_id=user_id, content=result) |
|
|
|
return result |
|
|
|
def get_url(self, url: str, user_agent: Optional[str] = None) -> str: |
|
""" |
|
get url |
|
""" |
|
return get_url(url, user_agent=user_agent) |
|
|