Spaces:
Running
Running
from __future__ import annotations | |
import base64 | |
import html | |
import io | |
import os | |
import re | |
from urllib import parse | |
import json | |
import markdown | |
from gradio.components import Chatbot as ChatBotBase | |
from modelscope_agent.output_parser import MRKLOutputParser | |
from PIL import Image | |
ALREADY_CONVERTED_MARK = '<!-- ALREADY CONVERTED BY PARSER. -->' | |
# 图片本地路径转换为 base64 格式 | |
def covert_image_to_base64(image_path): | |
# 获得文件后缀名 | |
ext = image_path.split('.')[-1] | |
if ext not in ['gif', 'jpeg', 'png']: | |
ext = 'jpeg' | |
with open(image_path, 'rb') as image_file: | |
# Read the file | |
encoded_string = base64.b64encode(image_file.read()) | |
# Convert bytes to string | |
base64_data = encoded_string.decode('utf-8') | |
# 生成base64编码的地址 | |
base64_url = f'data:image/{ext};base64,{base64_data}' | |
return base64_url | |
def convert_url(text, new_filename): | |
# Define the pattern to search for | |
# This pattern captures the text inside the square brackets, the path, and the filename | |
pattern = r'!\[([^\]]+)\]\(([^)]+)\)' | |
# Define the replacement pattern | |
# \1 is a backreference to the text captured by the first group ([^\]]+) | |
replacement = rf'![\1]({new_filename})' | |
# Replace the pattern in the text with the replacement | |
return re.sub(pattern, replacement, text) | |
def format_cover_html(configuration, bot_avatar_path): | |
if bot_avatar_path: | |
image_src = covert_image_to_base64(bot_avatar_path) | |
else: | |
image_src = '//img.alicdn.com/imgextra/i3/O1CN01YPqZFO1YNZerQfSBk_!!6000000003047-0-tps-225-225.jpg' | |
return f""" | |
<div class="bot_cover"> | |
<div class="bot_avatar"> | |
<img src={image_src} /> | |
</div> | |
<div class="bot_name">{configuration.get("name", "")}</div> | |
<div class="bot_desp">{configuration.get("description", "")}</div> | |
</div> | |
""" | |
def format_goto_publish_html(label, zip_url, agent_user_params, disable=False): | |
if disable: | |
return f"""<div class="publish_link_container"> | |
<a class="disabled">{label}</a> | |
</div> | |
""" | |
else: | |
params = {'AGENT_URL': zip_url} | |
params.update(agent_user_params) | |
template = 'modelscope/agent_template' | |
params_str = json.dumps(params) | |
link_url = f'https://www.modelscope.cn/studios/fork?target={template}&overwriteEnv={parse.quote(params_str)}' | |
return f""" | |
<div class="publish_link_container"> | |
<a href="{link_url}" target="_blank">{label}</a> | |
</div> | |
""" | |
class ChatBot(ChatBotBase): | |
def normalize_markdown(self, bot_message): | |
lines = bot_message.split('\n') | |
normalized_lines = [] | |
inside_list = False | |
for i, line in enumerate(lines): | |
if re.match(r'^(\d+\.|-|\*|\+)\s', line.strip()): | |
if not inside_list and i > 0 and lines[i - 1].strip() != '': | |
normalized_lines.append('') | |
inside_list = True | |
normalized_lines.append(line) | |
elif inside_list and line.strip() == '': | |
if i < len(lines) - 1 and not re.match(r'^(\d+\.|-|\*|\+)\s', | |
lines[i + 1].strip()): | |
normalized_lines.append(line) | |
continue | |
else: | |
inside_list = False | |
normalized_lines.append(line) | |
return '\n'.join(normalized_lines) | |
def convert_markdown(self, bot_message): | |
if bot_message.count('```') % 2 != 0: | |
bot_message += '\n```' | |
bot_message = self.normalize_markdown(bot_message) | |
result = markdown.markdown( | |
bot_message, | |
extensions=[ | |
'toc', 'extra', 'tables', 'markdown_katex', 'codehilite', | |
'markdown_cjk_spacing.cjk_spacing', 'pymdownx.magiclink' | |
], | |
extension_configs={ | |
'markdown_katex': { | |
'no_inline_svg': True, # fix for WeasyPrint | |
'insert_fonts_css': True, | |
}, | |
'codehilite': { | |
'linenums': False, | |
'guess_lang': True | |
}, | |
'mdx_truly_sane_lists': { | |
'nested_indent': 2, | |
'truly_sane': True, | |
} | |
}) | |
result = ''.join(result) | |
return result | |
def prompt_parse(message): | |
output = '' | |
if 'Thought' in message: | |
if 'Action' in message or 'Action Input:' in message: | |
re_pattern_thought = re.compile( | |
pattern=r'([\s\S]+)Thought:([\s\S]+)Action:') | |
res = re_pattern_thought.search(message) | |
if res is None: | |
re_pattern_thought_only = re.compile( | |
pattern=r'Thought:([\s\S]+)Action:') | |
res = re_pattern_thought_only.search(message) | |
llm_result = '' | |
else: | |
llm_result = res.group(1).strip() | |
action_thought_result = res.group(2).strip() | |
re_pattern_action = re.compile( | |
pattern= | |
r'Action:([\s\S]+)Action Input:([\s\S]+)<\|startofexec\|>') | |
res = re_pattern_action.search(message) | |
if res is None: | |
action, action_parameters = MRKLOutputParser( | |
).parse_response(message) | |
else: | |
action = res.group(1).strip() | |
action_parameters = res.group(2) | |
action_result = json.dumps({ | |
'api_name': action, | |
'parameters': action_parameters | |
}) | |
output += f'{llm_result}\n{action_thought_result}\n<|startofthink|>\n{action_result}\n<|endofthink|>\n' | |
if '<|startofexec|>' in message: | |
re_pattern3 = re.compile( | |
pattern=r'<\|startofexec\|>([\s\S]+)<\|endofexec\|>') | |
res3 = re_pattern3.search(message) | |
observation = res3.group(1).strip() | |
output += f'\n<|startofexec|>\n{observation}\n<|endofexec|>\n' | |
if 'Final Answer' in message: | |
re_pattern2 = re.compile( | |
pattern=r'Thought:([\s\S]+)Final Answer:([\s\S]+)') | |
res2 = re_pattern2.search(message) | |
# final_thought_result = res2.group(1).strip() | |
final_answer_result = res2.group(2).strip() | |
output += f'{final_answer_result}\n' | |
if output == '': | |
return message | |
print(output) | |
return output | |
else: | |
return message | |
def convert_bot_message(self, bot_message): | |
bot_message = ChatBot.prompt_parse(bot_message) | |
# print('processed bot message----------') | |
# print(bot_message) | |
# print('processed bot message done') | |
start_pos = 0 | |
result = '' | |
find_json_pattern = re.compile(r'{[\s\S]+}') | |
START_OF_THINK_TAG, END_OF_THINK_TAG = '<|startofthink|>', '<|endofthink|>' | |
START_OF_EXEC_TAG, END_OF_EXEC_TAG = '<|startofexec|>', '<|endofexec|>' | |
while start_pos < len(bot_message): | |
try: | |
start_of_think_pos = bot_message.index(START_OF_THINK_TAG, | |
start_pos) | |
end_of_think_pos = bot_message.index(END_OF_THINK_TAG, | |
start_pos) | |
if start_pos < start_of_think_pos: | |
result += self.convert_markdown( | |
bot_message[start_pos:start_of_think_pos]) | |
think_content = bot_message[start_of_think_pos | |
+ len(START_OF_THINK_TAG | |
):end_of_think_pos].strip() | |
json_content = find_json_pattern.search(think_content) | |
think_content = json_content.group( | |
) if json_content else think_content | |
try: | |
think_node = json.loads(think_content) | |
plugin_name = think_node.get( | |
'plugin_name', | |
think_node.get('plugin', | |
think_node.get('api_name', 'unknown'))) | |
summary = f'选择插件【{plugin_name}】,调用处理中...' | |
del think_node['url'] | |
# think_node.pop('url', None) | |
detail = f'```json\n\n{json.dumps(think_node, indent=3, ensure_ascii=False)}\n\n```' | |
except Exception: | |
summary = '思考中...' | |
detail = think_content | |
# traceback.print_exc() | |
# detail += traceback.format_exc() | |
result += '<details> <summary>' + summary + '</summary>' + self.convert_markdown( | |
detail) + '</details>' | |
# print(f'detail:{detail}') | |
start_pos = end_of_think_pos + len(END_OF_THINK_TAG) | |
except Exception: | |
# result += traceback.format_exc() | |
break | |
# continue | |
try: | |
start_of_exec_pos = bot_message.index(START_OF_EXEC_TAG, | |
start_pos) | |
end_of_exec_pos = bot_message.index(END_OF_EXEC_TAG, start_pos) | |
# print(start_of_exec_pos) | |
# print(end_of_exec_pos) | |
# print(bot_message[start_of_exec_pos:end_of_exec_pos]) | |
# print('------------------------') | |
if start_pos < start_of_exec_pos: | |
result += self.convert_markdown( | |
bot_message[start_pos:start_of_think_pos]) | |
exec_content = bot_message[start_of_exec_pos | |
+ len(START_OF_EXEC_TAG | |
):end_of_exec_pos].strip() | |
try: | |
summary = '完成插件调用.' | |
detail = f'```json\n\n{exec_content}\n\n```' | |
except Exception: | |
pass | |
result += '<details> <summary>' + summary + '</summary>' + self.convert_markdown( | |
detail) + '</details>' | |
start_pos = end_of_exec_pos + len(END_OF_EXEC_TAG) | |
except Exception: | |
# result += traceback.format_exc() | |
continue | |
if start_pos < len(bot_message): | |
result += self.convert_markdown(bot_message[start_pos:]) | |
result += ALREADY_CONVERTED_MARK | |
return result | |
def convert_bot_message_for_qwen(self, bot_message): | |
start_pos = 0 | |
result = '' | |
find_json_pattern = re.compile(r'{[\s\S]+}') | |
ACTION = 'Action:' | |
ACTION_INPUT = 'Action Input' | |
OBSERVATION = 'Observation' | |
RESULT_START = '<result>' | |
RESULT_END = '</result>' | |
while start_pos < len(bot_message): | |
try: | |
action_pos = bot_message.index(ACTION, start_pos) | |
action_input_pos = bot_message.index(ACTION_INPUT, start_pos) | |
result += self.convert_markdown( | |
bot_message[start_pos:action_pos]) | |
# Action: image_gen | |
# Action Input | |
# {"text": "金庸武侠 世界", "resolution": "1280x720"} | |
# Observation: <result>![IMAGEGEN](https://dashscope-result-sh.oss-cn-shanghai.aliyuncs.com/1d/e9/20231116/723609ee/d046d2d9-0c95-420b-9467-f0e831f5e2b7-1.png?Expires=1700227460&OSSAccessKeyId=LTAI5tQZd8AEcZX6KZV4G8qL&Signature=R0PlEazQF9uBD%2Fh9tkzOkJMGyg8%3D)<result> # noqa E501 | |
action_name = bot_message[action_pos | |
+ len(ACTION | |
):action_input_pos].strip() | |
# action_start action_end 使用 Action Input 到 Observation 之间 | |
action_input_end = bot_message[action_input_pos:].index( | |
OBSERVATION) - 1 | |
action_input = bot_message[action_input_pos:action_input_pos | |
+ action_input_end].strip() | |
is_json = find_json_pattern.search(action_input) | |
if is_json: | |
action_input = is_json.group() | |
else: | |
action_input = re.sub(r'^Action Input[:]?[\s]*', '', | |
action_input) | |
summary = f'调用工具 {action_name}' | |
if is_json: | |
detail = f'```json\n\n{json.dumps(json.loads(action_input), indent=4, ensure_ascii=False)}\n\n```' | |
else: | |
detail = action_input | |
result += '<details> <summary>' + summary + '</summary>' + self.convert_markdown( | |
detail) + '</details>' | |
start_pos = action_input_pos + action_input_end + 1 | |
try: | |
observation_pos = bot_message.index(OBSERVATION, start_pos) | |
idx = observation_pos + len(OBSERVATION) | |
obs_message = bot_message[idx:] | |
observation_start_id = obs_message.index( | |
RESULT_START) + len(RESULT_START) | |
observation_end_idx = obs_message.index(RESULT_END) | |
summary = '完成调用' | |
exec_content = obs_message[ | |
observation_start_id:observation_end_idx] | |
detail = f'```\n\n{exec_content}\n\n```' | |
start_pos = idx + observation_end_idx + len(RESULT_END) | |
except Exception: | |
summary = '执行中...' | |
detail = '' | |
exec_content = None | |
result += '<details> <summary>' + summary + '</summary>' + self.convert_markdown( | |
detail) + '</details>' | |
if exec_content is not None and '[IMAGEGEN]' in exec_content: | |
# convert local file to base64 | |
re_pattern = re.compile(pattern=r'!\[[^\]]+\]\(([^)]+)\)') | |
res = re_pattern.search(exec_content) | |
if res: | |
image_path = res.group(1).strip() | |
if os.path.isfile(image_path): | |
exec_content = convert_url( | |
exec_content, | |
covert_image_to_base64(image_path)) | |
result += self.convert_markdown(f'{exec_content}') | |
except Exception: | |
# import traceback; traceback.print_exc() | |
result += self.convert_markdown(bot_message[start_pos:]) | |
start_pos = len(bot_message[start_pos:]) | |
break | |
result += ALREADY_CONVERTED_MARK | |
return result | |
def postprocess( | |
self, | |
message_pairs: list[list[str | tuple[str] | tuple[str, str] | None] | |
| tuple], | |
) -> list[list[str | dict | None]]: | |
""" | |
Parameters: | |
message_pairs: List of lists representing the message and response pairs. | |
Each message and response should be a string, which may be in Markdown format. | |
It can also be a tuple whose first element is a string or pathlib. | |
Path filepath or URL to an image/video/audio, and second (optional) element is the alt text, | |
in which case the media file is displayed. It can also be None, in which case that message is not displayed. | |
Returns: | |
List of lists representing the message and response. Each message and response will be a string of HTML, | |
or a dictionary with media information. Or None if the message is not to be displayed. | |
""" | |
if message_pairs is None: | |
return [] | |
processed_messages = [] | |
for message_pair in message_pairs: | |
assert isinstance( | |
message_pair, (tuple, list) | |
), f'Expected a list of lists or list of tuples. Received: {message_pair}' | |
assert ( | |
len(message_pair) == 2 | |
), f'Expected a list of lists of length 2 or list of tuples of length 2. Received: {message_pair}' | |
if isinstance(message_pair[0], tuple) or isinstance( | |
message_pair[1], tuple): | |
processed_messages.append([ | |
self._postprocess_chat_messages(message_pair[0]), | |
self._postprocess_chat_messages(message_pair[1]), | |
]) | |
else: | |
# 处理不是元组的情况 | |
user_message, bot_message = message_pair | |
if user_message and not user_message.endswith( | |
ALREADY_CONVERTED_MARK): | |
convert_md = self.convert_markdown( | |
html.escape(user_message)) | |
user_message = f'{convert_md}' + ALREADY_CONVERTED_MARK | |
if bot_message and not bot_message.endswith( | |
ALREADY_CONVERTED_MARK): | |
# bot_message = self.convert_bot_message(bot_message) | |
bot_message = self.convert_bot_message_for_qwen( | |
bot_message) | |
processed_messages.append([ | |
user_message, | |
bot_message, | |
]) | |
return processed_messages | |