DataMaker / functions.py
wangrongsheng's picture
Upload 5 files
aa77a85
raw
history blame contribute delete
No virus
20.8 kB
# import gradio as gr
import gradio
# import lmdb
# import base64
# import io
import random
import time
import os
import re
import sys
import json
import copy
# import sqlite3
import hashlib
import uuid
from urllib.parse import urljoin
import openai
def get_random_sleep(base_time, random_range):
return (base_time + random.randint(-random_range, random_range))*0.001
def js_load(txt):
try:
return json.loads(txt)
except Exception as error:
print('')
print('js_load:')
print(str(error))
print('')
return None
def js_dump(thing):
try:
return json.dumps(thing)
except Exception as error:
print('')
print('js_dump:')
print(str(error))
print('')
return None
def filtered_history(history, num=0):
if num > 0:
filtered = list(filter(lambda it:(it['type'] in ['request', 'response']), history))
return filtered[-num:]
return []
def filtered_history_messages(history, num=0):
filtered = filtered_history(history, num)
return list(map(lambda it:{'role': it.get('role'), 'content': it.get('content')}, filtered))
def make_md_line(role, content):
return f"""\n##### `{role}`\n\n{content}\n"""
def make_md_by_history(history):
md = ""
for item in history:
md += make_md_line(item.get('role'), item.get('content'))
return md
def make_history_file_fn(history):
uuid4 = str(uuid.uuid4())
json_file_path = None
md_file_path = None
save_history = []
for item in history:
if item["role"] == "assistant":
info = {
"content": str(item["content"]).replace("\n", "")
}
save_history.append(info)
try:
# 如果目录不存在,则创建目录
os.makedirs('temp_files', exist_ok=True)
json_file_content = json.dumps(save_history, ensure_ascii=False)
json_file_path = os.path.join('temp_files', f'save_history[{uuid4}].json')
with open(json_file_path, 'w') as f:
f.write(json_file_content)
md_file_content = make_md_by_history(save_history)
md_file_path = os.path.join('temp_files', f'save_history[{uuid4}].md')
with open(md_file_path, 'w') as f:
f.write(md_file_content)
return json_file_path, md_file_path, gradio.update(visible=True)
except Exception as error:
print(f"\n{error}\n")
return json_file_path, md_file_path, gradio.update(visible=True)
def make_history_file_fn__(history):
uuid4 = str(uuid.uuid4())
try:
json_file_content = json.dumps(history, ensure_ascii=False)
json_file_path = os.path.join('temp_files', f'history[{uuid4}].json')
with open(json_file_path, 'w') as f:
f.write(json_file_content)
except Exception as error:
print(f"\n{error}\n")
json_file_path = None
try:
md_file_content = make_md_by_history(history)
md_file_path = os.path.join('temp_files', f'history[{uuid4}].md')
with open(md_file_path, 'w') as f:
f.write(md_file_content)
except Exception as error:
print(f"\n{error}\n")
md_file_path = None
return json_file_path, md_file_path, gradio.update(visible=True)
def make_user_message_list_fn__(
user_message_template, # 模板,套用到每一条消息上
user_message_template_mask, # 模板中要被替换的部分
user_message_template_mask_is_regex, # 决定如何构造用于替换的正则表达式
user_message_list_text, # 一段文本,包含了每一条用户消息
user_message_list_text_splitter, # 描述了应该以什么为线索来切分 user_message_list_text
user_message_list_text_splitter_is_regex, # 决定如何进行切分
) -> list:
# 返回套用了模板的用户信息列表
# 这个实现首先根据是否使用正则表达式来切分用户消息列表文本,并将切分后的消息存储在一个列表中。
# 然后,针对每个消息,根据user_message_template_mask及user_message_template_mask_is_regex替换模板中的部分内容,
# 并将替换后的结果添加到结果列表中。
# 最后,返回结果列表。
# 切分用户消息列表文本
if user_message_list_text_splitter_is_regex:
user_messages = re.split(user_message_list_text_splitter, user_message_list_text)
else:
user_messages = user_message_list_text.split(user_message_list_text_splitter)
# 生成套用模板的用户信息列表
user_message_result_list = []
for message in user_messages:
# 替换模板内容
if user_message_template_mask_is_regex:
transformed_message = re.sub(user_message_template_mask, message, user_message_template)
else:
transformed_message = user_message_template.replace(user_message_template_mask, message)
user_message_result_list.append(transformed_message)
return user_message_result_list
def make_user_message_list_fn(
user_message_template,
user_message_template_mask,
user_message_template_mask_is_regex,
user_message_list_text,
user_message_list_text_splitter,
user_message_list_text_splitter_is_regex,
) -> list:
# 实际上,只要保证在使用正则表达式进行替换或切分操作之前,已经将其编译为正则表达式对象即可。
# 在我的修改中,针对 xxx_is_regex 参数为 True 的情况,将这些参数编译成正则表达式。
# 对于替换操作和切分操作,只需检查是否已经编译为正则表达式,并使用相应的方法即可。
# 编译正则表达式
if user_message_template_mask_is_regex:
user_message_template_mask = re.compile(user_message_template_mask)
if user_message_list_text_splitter_is_regex:
user_message_list_text_splitter = re.compile(user_message_list_text_splitter)
# 切分用户消息列表文本
if user_message_list_text_splitter_is_regex:
user_messages = user_message_list_text_splitter.split(user_message_list_text)
else:
user_messages = user_message_list_text.split(user_message_list_text_splitter)
# 生成套用模板的用户信息列表
user_message_result_list = []
for message in user_messages:
# 替换模板内容
if user_message_template_mask_is_regex:
transformed_message = user_message_template_mask.sub(message, user_message_template)
else:
transformed_message = user_message_template.replace(user_message_template_mask, message)
user_message_result_list.append(transformed_message)
return user_message_result_list
def sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips):
# print("\n\n")
assistant_message = ""
tips = ""
try:
openai.api_key = api_key_text
completion = openai.ChatCompletion.create(**payload)
if payload.get('stream'):
print('assistant:')
# print('->>>')
is_first=True
for chunk in completion:
if is_first:
is_first = False
continue
if chunk.choices[0].finish_reason is None:
# sys.stdout.write("\r")
print(chunk.choices[0].delta.content or '', end="")
assistant_message += chunk.choices[0].delta.content or ''
# print(f"\033[2K{assistant_message}", end="")
history_md_stream = make_md_line('assistant', assistant_message)
tips = 'streaming'
yield assistant_message, history_md_stream, tips, history
else:
pass
pass
# print('=>>>')
print('')
pass
else:
assistant_message = completion.choices[0].message.content
history_md_stream = make_md_line('assistant', assistant_message)
tips = 'got'
print('assistant:')
print(assistant_message)
yield assistant_message, history_md_stream, tips, history
pass
except Exception as error:
tips = str(error)
history.append({"role": "app", "content": tips})
print(f"\n{tips}\n")
yield assistant_message, history_md_stream, tips, history
pass
# print("\n\n")
def sequential_chat_fn(
history,
system_prompt_enabled,
system_prompt,
user_message_template,
user_message_template_mask,
user_message_template_mask_is_regex,
user_message_list_text,
user_message_list_text_splitter,
user_message_list_text_splitter_is_regex,
history_prompt_num,
api_key_text, token_text,
sleep_base, sleep_rand,
prop_stream, prop_model, prop_temperature, prop_top_p, prop_choices_num, prop_max_tokens, prop_presence_penalty, prop_frequency_penalty, prop_logit_bias,
):
# outputs=[
# history,
# history_md_stable,
# history_md_stream,
# tips,
# file_row,
# ],
history_md_stable = ""
history_md_stream = ""
tips = ""
try:
user_message_list = make_user_message_list_fn(
user_message_template,
user_message_template_mask,
user_message_template_mask_is_regex,
user_message_list_text,
user_message_list_text_splitter,
user_message_list_text_splitter_is_regex,
)
payload = {
'model': prop_model,
'temperature': prop_temperature,
'top_p': prop_top_p,
'n': prop_choices_num,
'stream': prop_stream,
'presence_penalty': prop_presence_penalty,
'frequency_penalty': prop_frequency_penalty,
'user': token_text,
}
if prop_max_tokens>0:
payload['max_tokens'] = prop_max_tokens
# if prop_logit_bias is not None:
# payload['logit_bias'] = prop_logit_bias
# headers = {
# "Content-Type": "application/json",
# "Authorization": f"Bearer {api_key_text}"
# }
for user_message in user_message_list:
print('')
print(f'user({token_text}):')
print(user_message)
print('')
# make the_messages to sent
the_messages = []
if system_prompt_enabled:
the_messages.append({"role": "system", "content": system_prompt})
for msg in filtered_history_messages(history, num=history_prompt_num):
the_messages.append(msg)
the_messages.append({"role": "user", "content": user_message})
payload['messages'] = the_messages
history.append({"role": "user", "content": user_message, "type": "request", "payload": payload})
history_md_stable = make_md_by_history(history)
history_md_stream = ""
tips = ""
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False)
try:
for (assistant_message, history_md_stream, tips, history) in sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips):
yield history, history_md_stable, history_md_stream, tips, gradio.update()
history.append({"role": "assistant", "content": assistant_message, "type": "request"})
history_md_stable += history_md_stream
history_md_stream = ""
tips = "fine"
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False)
except Exception as error:
tips = f'error: {str(error)}'
history.append({"role": "app", "content": tips})
print(f"\n{tips}\n")
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False)
time.sleep(get_random_sleep(sleep_base, sleep_rand))
pass
except Exception as error:
tips = str(error)
history.append({"role": "app", "content": tips})
print(f"\n{tips}\n")
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False)
pass
def on_click_send_btn(
global_state_json, api_key_text, chat_input_role, chat_input, prompt_table, chat_use_prompt, chat_use_history, chat_log,
chat_model, temperature, top_p, choices_num, stream, max_tokens, presence_penalty, frequency_penalty, logit_bias,
):
old_state = json.loads(global_state_json or "{}")
print('\n\n\n\n\n')
print(prompt_table)
prompt_table = prompt_table or []
chat_log = chat_log or []
chat_log_md = ''
if chat_use_prompt:
chat_log_md += '<center>(prompt)</center>\n\n'
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)])
chat_log_md += '\n---\n'
if True:
chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n'
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)])
chat_log_md += '\n---\n'
# if chat_input=='':
# return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input
print('\n')
print(chat_input)
print('')
try:
logit_bias_json = json.dumps(logit_bias) if logit_bias else None
except:
return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input
new_state = copy.deepcopy(old_state) or {}
req_hist = copy.deepcopy(prompt_table) if chat_use_prompt else []
if chat_use_history:
for hh in (chat_log or []):
req_hist.append(hh)
if chat_input and chat_input!="":
req_hist.append([(chat_input_role or 'user'), chat_input])
openai.api_key = api_key_text
props = {
'model': chat_model,
'messages': [xx for xx in map(lambda it: {'role':it[0], 'content':it[1]}, req_hist)],
'temperature': temperature,
'top_p': top_p,
'n': choices_num,
'stream': stream,
'presence_penalty': presence_penalty,
'frequency_penalty': frequency_penalty,
}
if max_tokens>0:
props['max_tokens'] = max_tokens
if logit_bias_json is not None:
props['logit_bias'] = logit_bias_json
props_json = json.dumps(props)
try:
completion = openai.ChatCompletion.create(**props)
print('')
# print(completion.choices)
# the_response_role = completion.choices[0].message.role
# the_response = completion.choices[0].message.content
# print(the_response)
# print('')
# chat_last_resp = json.dumps(completion.__dict__)
# chat_last_resp_dict = json.loads(chat_last_resp)
# chat_last_resp_dict['api_key'] = "hidden by UI"
# chat_last_resp_dict['organization'] = "hidden by UI"
# chat_last_resp = json.dumps(chat_last_resp_dict)
chat_log_md = ''
if chat_use_prompt:
chat_log_md += '<center>(prompt)</center>\n\n'
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)])
chat_log_md += '\n---\n'
if True:
chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n'
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)])
chat_log_md += '\n---\n'
if chat_input and chat_input!="":
chat_log.append([(chat_input_role or 'user'), chat_input])
chat_log_md += f"##### `{(chat_input_role or 'user')}`\n\n{chat_input}\n\n"
partial_words = ""
counter=0
if stream:
the_response = ''
the_response_role = ''
for chunk in completion:
#Skipping first chunk
if counter == 0:
the_response_role = chunk.choices[0].delta.role
chat_log_md += f"##### `{the_response_role}`\n\n"
counter += 1
continue
# print(('chunk', chunk))
if chunk.choices[0].finish_reason is None:
the_response_chunk = chunk.choices[0].delta.content
the_response += the_response_chunk
chat_log_md += f"{the_response_chunk}"
yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, "{}", props_json, ''
else:
chat_log.append([the_response_role, the_response])
chat_log_md += f"\n\n"
yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, '{"msg": "stream模式不支持显示"}', props_json, ''
# chat_last_resp = json.dumps(completion.__dict__)
# chat_last_resp_dict = json.loads(chat_last_resp)
# chat_last_resp_dict['api_key'] = "hidden by UI"
# chat_last_resp_dict['organization'] = "hidden by UI"
# chat_last_resp = json.dumps(chat_last_resp_dict)
else:
the_response_role = completion.choices[0].message.role
the_response = completion.choices[0].message.content
print(the_response)
print('')
chat_log.append([the_response_role, the_response])
chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n"
chat_last_resp = json.dumps(completion.__dict__)
chat_last_resp_dict = json.loads(chat_last_resp)
chat_last_resp_dict['api_key'] = "hidden by UI"
chat_last_resp_dict['organization'] = "hidden by UI"
chat_last_resp = json.dumps(chat_last_resp_dict)
return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, ''
# chat_log.append([the_response_role, the_response])
# chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n"
# return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, ''
except Exception as error:
print(error)
print('error!!!!!!')
chat_log_md = ''
if chat_use_prompt:
chat_log_md += '<center>(prompt)</center>\n\n'
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)])
chat_log_md += '\n---\n'
if True:
chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n'
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)])
chat_log_md += '\n---\n'
# chat_log_md = ''
# chat_log_md = "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) if chat_use_prompt else ''
# chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", hist)])
chat_log_md += "\n"
chat_log_md += str(error)
return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, None, props_json, chat_input
def clear_history():
return [], ""
def copy_history(txt):
# print('\n\n copying')
# print(txt)
# print('\n\n')
pass
def update_saved_prompt_titles(global_state_json, selected_saved_prompt_title):
print('')
global_state = json.loads(global_state_json or "{}")
print(global_state)
print(selected_saved_prompt_title)
saved_prompts = global_state.get('saved_prompts') or []
print(saved_prompts)
the_choices = [(it.get('title') or '[untitled]') for it in saved_prompts]
print(the_choices)
print('')
return gradio.Dropdown.update(choices=the_choices)
def save_prompt(global_state_json, saved_prompts, prompt_title, prompt_table):
the_choices = []
global_state = json.loads(global_state_json or "{}")
saved_prompts = global_state.get('saved_prompts') or []
if len(saved_prompts):
the_choices = [it.get('title') or '[untitled]' for it in saved_prompts]
pass
return global_state_json, gradio.Dropdown.update(choices=the_choices, value=prompt_title), prompt_title, prompt_table
def load_saved_prompt(title):
pass