')
if len(chunks) > 1:
new_bot_message = ''
for idx, chunk in enumerate(chunks):
new_bot_message += chunk
if idx % 2 == 0:
if idx != len(chunks) - 1:
new_bot_message += '<|startofthink|>'
else:
new_bot_message += '<|endofthink|>'
bot_message = new_bot_message
start_pos = 0
result = ''
find_json_pattern = re.compile(r'{[\s\S]+}')
START_OF_THINK_TAG, END_OF_THINK_TAG = '<|startofthink|>', '<|endofthink|>'
START_OF_EXEC_TAG, END_OF_EXEC_TAG = '<|startofexec|>', '<|endofexec|>'
while start_pos < len(bot_message):
try:
start_of_think_pos = bot_message.index(START_OF_THINK_TAG,
start_pos)
end_of_think_pos = bot_message.index(END_OF_THINK_TAG,
start_pos)
if start_pos < start_of_think_pos:
result += self.convert_markdown(
bot_message[start_pos:start_of_think_pos])
think_content = bot_message[start_of_think_pos
+ len(START_OF_THINK_TAG
):end_of_think_pos].strip()
json_content = find_json_pattern.search(think_content)
think_content = json_content.group(
) if json_content else think_content
try:
think_node = json.loads(
think_content.replace('\n', ''), strict=False)
plugin_name = think_node.get(
'plugin_name',
think_node.get('plugin',
think_node.get('api_name', 'unknown')))
summary = f'选择插件【{plugin_name}】'
think_node.pop('url', None)
detail = f'```json\n\n{json.dumps(think_node,indent=3,ensure_ascii=False)}\n\n```'
except Exception:
traceback.print_exc()
summary = '思考中...'
detail = think_content
# detail += traceback.format_exc()
result += ' ' + summary + '
' + self.convert_markdown(
detail) + ' '
start_pos = end_of_think_pos + len(END_OF_THINK_TAG)
except Exception:
# result += traceback.format_exc()
break
try:
start_of_exec_pos = bot_message.index(START_OF_EXEC_TAG,
start_pos)
end_of_exec_pos = bot_message.index(END_OF_EXEC_TAG, start_pos)
if start_pos < start_of_exec_pos:
result += self.convert_markdown(
bot_message[start_pos:start_of_think_pos])
exec_content = bot_message[start_of_exec_pos
+ len(START_OF_EXEC_TAG
):end_of_exec_pos].strip()
exec_content = self.process_exec_result(exec_content)
# result += self.convert_markdown(exec_content)
summary = '执行结果'
result += ' ' + summary + '
' + self.convert_markdown(
exec_content) + ' '
start_pos = end_of_exec_pos + len(END_OF_EXEC_TAG)
except Exception:
# traceback.print_exc()
break
if start_pos < len(bot_message):
result += self.convert_markdown(
bot_message[start_pos:], remove_media=True)
result += ALREADY_CONVERTED_MARK
return result
def postprocess(
self, message_pairs: List[Tuple[str | None, str | None]]
) -> List[Tuple[str | None, str | None]]:
"""
Parameters:
y: List of tuples representing the message and response pairs.
Each message and response should be a string, which may be in Markdown format.
Returns:
List of tuples representing the message and response. Each message and response will be a string of HTML.
"""
if not message_pairs:
return []
user_message, bot_message = message_pairs[-1]
if not user_message.endswith(ALREADY_CONVERTED_MARK):
user_message = f"{self.convert_markdown(html.escape(user_message))}
"\
+ ALREADY_CONVERTED_MARK
if not bot_message.endswith(ALREADY_CONVERTED_MARK):
bot_message = self.convert_bot_message(bot_message)
message_pairs[-1] = (user_message, bot_message)
return message_pairs
def process_exec_result(self, exec_result: str):
exec_result = exec_result.replace("{'result': ", '')
exec_result = exec_result[:-1]
exec_result = exec_result.replace("'", "\"")
try:
exec_result = json.loads(
exec_result.replace('\n', ''), strict=False)
final_result = f'```json\n\n{exec_result}\n\n```'
return final_result
except Exception:
match_image = re.search(r'!\[IMAGEGEN\]\((.*?)\)', exec_result)
if match_image:
img_path = match_image.group(1)
gr_img_path = self.transform_to_gr_file(img_path)
final_result = exec_result.replace(img_path, gr_img_path)
return final_result
match_audio = re.search(
r'