from __future__ import annotations import ast import html import re import traceback from typing import List, Tuple import gradio as gr import json import markdown from gradio.components import Chatbot as ChatBotBase ALREADY_CONVERTED_MARK = '' class ChatBot(ChatBotBase): def normalize_markdown(self, bot_message, remove_media=False): if remove_media: media_regex = r'(!\[[^\]]*\]\([^)]+\)|]+>.*?)\n*' # 使用正则表达式进行替换 bot_message = re.sub(media_regex, '', bot_message) lines = bot_message.split('\n') normalized_lines = [] inside_list = False for i, line in enumerate(lines): if re.match(r'^(\d+\.|-|\*|\+)\s', line.strip()): if not inside_list and i > 0 and lines[i - 1].strip() != '': normalized_lines.append('') inside_list = True normalized_lines.append(line) elif inside_list and line.strip() == '': if i < len(lines) - 1 and not re.match(r'^(\d+\.|-|\*|\+)\s', lines[i + 1].strip()): normalized_lines.append(line) continue else: inside_list = False normalized_lines.append(line) return '\n'.join(normalized_lines) def convert_markdown(self, bot_message, remove_media=False): if bot_message.count('```') % 2 != 0: bot_message += '\n```' bot_message = self.normalize_markdown(bot_message, remove_media) result = markdown.markdown( bot_message, extensions=[ 'toc', 'extra', 'tables', 'markdown_katex', 'codehilite', 'mdx_truly_sane_lists', 'markdown_cjk_spacing.cjk_spacing', 'pymdownx.magiclink' ], extension_configs={ 'markdown_katex': { 'no_inline_svg': True, # fix for WeasyPrint 'insert_fonts_css': True, }, 'codehilite': { 'linenums': False, 'guess_lang': True }, 'mdx_truly_sane_lists': { 'nested_indent': 2, 'truly_sane': True, } }) result = ''.join(result) return result def convert_bot_message(self, bot_message): # 兼容老格式 chunks = bot_message.split('') if len(chunks) > 1: new_bot_message = '' for idx, chunk in enumerate(chunks): new_bot_message += chunk if idx % 2 == 0: if idx != len(chunks) - 1: new_bot_message += '<|startofthink|>' else: new_bot_message += '<|endofthink|>' bot_message = new_bot_message start_pos = 0 result = '' find_json_pattern = re.compile(r'{[\s\S]+}') START_OF_THINK_TAG, END_OF_THINK_TAG = '<|startofthink|>', '<|endofthink|>' START_OF_EXEC_TAG, END_OF_EXEC_TAG = '<|startofexec|>', '<|endofexec|>' while start_pos < len(bot_message): try: start_of_think_pos = bot_message.index(START_OF_THINK_TAG, start_pos) end_of_think_pos = bot_message.index(END_OF_THINK_TAG, start_pos) if start_pos < start_of_think_pos: result += self.convert_markdown( bot_message[start_pos:start_of_think_pos]) think_content = bot_message[start_of_think_pos + len(START_OF_THINK_TAG ):end_of_think_pos].strip() json_content = find_json_pattern.search(think_content) think_content = json_content.group( ) if json_content else think_content try: think_node = json.loads( think_content.replace('\n', ''), strict=False) plugin_name = think_node.get( 'plugin_name', think_node.get('plugin', think_node.get('api_name', 'unknown'))) summary = f'选择插件【{plugin_name}】' think_node.pop('url', None) detail = f'```json\n\n{json.dumps(think_node,indent=3,ensure_ascii=False)}\n\n```' except Exception: traceback.print_exc() summary = '思考中...' detail = think_content # detail += traceback.format_exc() result += '
' + summary + '' + self.convert_markdown( detail) + '
' start_pos = end_of_think_pos + len(END_OF_THINK_TAG) except Exception: # result += traceback.format_exc() break try: start_of_exec_pos = bot_message.index(START_OF_EXEC_TAG, start_pos) end_of_exec_pos = bot_message.index(END_OF_EXEC_TAG, start_pos) if start_pos < start_of_exec_pos: result += self.convert_markdown( bot_message[start_pos:start_of_think_pos]) exec_content = bot_message[start_of_exec_pos + len(START_OF_EXEC_TAG ):end_of_exec_pos].strip() exec_content = self.process_exec_result(exec_content) # result += self.convert_markdown(exec_content) summary = '执行结果' result += '
' + summary + '' + self.convert_markdown( exec_content) + '
' start_pos = end_of_exec_pos + len(END_OF_EXEC_TAG) except Exception: # traceback.print_exc() break if start_pos < len(bot_message): result += self.convert_markdown( bot_message[start_pos:], remove_media=True) result += ALREADY_CONVERTED_MARK return result def postprocess( self, message_pairs: List[Tuple[str | None, str | None]] ) -> List[Tuple[str | None, str | None]]: """ Parameters: y: List of tuples representing the message and response pairs. Each message and response should be a string, which may be in Markdown format. Returns: List of tuples representing the message and response. Each message and response will be a string of HTML. """ if not message_pairs: return [] user_message, bot_message = message_pairs[-1] if not user_message.endswith(ALREADY_CONVERTED_MARK): user_message = f"

{self.convert_markdown(html.escape(user_message))}

"\ + ALREADY_CONVERTED_MARK if not bot_message.endswith(ALREADY_CONVERTED_MARK): bot_message = self.convert_bot_message(bot_message) message_pairs[-1] = (user_message, bot_message) return message_pairs def process_exec_result(self, exec_result: str): exec_result = exec_result.replace("{'result': ", '') exec_result = exec_result[:-1] exec_result = exec_result.replace("'", "\"") try: exec_result = json.loads( exec_result.replace('\n', ''), strict=False) final_result = f'```json\n\n{exec_result}\n\n```' return final_result except Exception: match_image = re.search(r'!\[IMAGEGEN\]\((.*?)\)', exec_result) if match_image: img_path = match_image.group(1) gr_img_path = self.transform_to_gr_file(img_path) final_result = exec_result.replace(img_path, gr_img_path) return final_result match_audio = re.search( r'