AI-Agent / app.py
kevinwang676's picture
Update app.py
c49d190
raw
history blame
No virus
26.9 kB
import os, sys
sys.path.append("./agentfabric/")
import random
import re
import shutil
import traceback
import gradio as gr
import json
import yaml
from builder_core import beauty_output, init_builder_chatbot_agent
from config_utils import (DEFAULT_AGENT_DIR, Config, get_avatar_image,
get_ci_dir, get_user_cfg_file, get_user_dir,
is_valid_plugin_configuration, parse_configuration,
save_avatar_image, save_builder_configuration,
save_plugin_configuration)
from gradio_utils import ChatBot, format_cover_html, format_goto_publish_html
from i18n import I18n
from publish_util import pop_user_info_from_config, prepare_agent_zip
from user_core import init_user_chatbot_agent
def init_user(uuid_str, state):
try:
seed = state.get('session_seed', random.randint(0, 1000000000))
user_agent = init_user_chatbot_agent(uuid_str)
user_agent.seed = seed
state['user_agent'] = user_agent
except Exception as e:
error = traceback.format_exc()
print(f'Error:{e}, with detail: {error}')
return state
def init_builder(uuid_str, state):
try:
builder_agent = init_builder_chatbot_agent(uuid_str)
state['builder_agent'] = builder_agent
except Exception as e:
error = traceback.format_exc()
print(f'Error:{e}, with detail: {error}')
return state
def update_builder(uuid_str, state):
builder_agent = state['builder_agent']
try:
builder_cfg_file = get_user_cfg_file(uuid_str=uuid_str)
with open(builder_cfg_file, 'r') as f:
config = json.load(f)
builder_agent.update_config_to_history(config)
except Exception as e:
error = traceback.format_exc()
print(f'Error:{e}, with detail: {error}')
return state
def check_uuid(uuid_str):
if not uuid_str or uuid_str == '':
if os.getenv('MODELSCOPE_ENVIRONMENT') == 'studio':
raise gr.Error('请登陆后使用! (Please login first)')
else:
uuid_str = 'local_user'
return uuid_str
def process_configuration(uuid_str, bot_avatar, name, description,
instructions, model, suggestions, knowledge_files,
capabilities_checkboxes, openapi_schema,
openapi_auth, openapi_auth_apikey,
openapi_auth_apikey_type, openapi_privacy_policy,
state):
uuid_str = check_uuid(uuid_str)
tool_cfg = state['tool_cfg']
capabilities = state['capabilities']
bot_avatar, bot_avatar_path = save_avatar_image(bot_avatar, uuid_str)
suggestions_filtered = [row for row in suggestions if row[0]]
user_dir = get_user_dir(uuid_str)
if knowledge_files is not None:
new_knowledge_files = [
os.path.join(user_dir, os.path.basename((f.name)))
for f in knowledge_files
]
for src_file, dst_file in zip(knowledge_files, new_knowledge_files):
if not os.path.exists(dst_file):
shutil.copy(src_file.name, dst_file)
else:
new_knowledge_files = []
builder_cfg = {
'name': name,
'avatar': bot_avatar,
'description': description,
'instruction': instructions,
'prompt_recommend': [row[0] for row in suggestions_filtered],
'knowledge': new_knowledge_files,
'tools': {
capability: dict(
name=tool_cfg[capability]['name'],
is_active=tool_cfg[capability]['is_active'],
use=True if capability in capabilities_checkboxes else False)
for capability in map(lambda item: item[1], capabilities)
},
'model': model,
}
try:
try:
schema_dict = json.loads(openapi_schema)
except json.decoder.JSONDecodeError:
schema_dict = yaml.safe_load(openapi_schema)
except Exception as e:
raise gr.Error(
f'OpenAPI schema format error, should be one of json and yaml: {e}'
)
openapi_plugin_cfg = {
'schema': schema_dict,
'auth': {
'type': openapi_auth,
'apikey': openapi_auth_apikey,
'apikey_type': openapi_auth_apikey_type
},
'privacy_policy': openapi_privacy_policy
}
if is_valid_plugin_configuration(openapi_plugin_cfg):
save_plugin_configuration(openapi_plugin_cfg, uuid_str)
except Exception as e:
error = traceback.format_exc()
print(f'Error:{e}, with detail: {error}')
save_builder_configuration(builder_cfg, uuid_str)
update_builder(uuid_str, state)
init_user(uuid_str, state)
return [
gr.HTML.update(
visible=True,
value=format_cover_html(builder_cfg, bot_avatar_path)),
gr.Chatbot.update(
visible=False,
avatar_images=get_avatar_image(bot_avatar, uuid_str)),
gr.Dataset.update(samples=suggestions_filtered),
gr.DataFrame.update(value=suggestions_filtered)
]
# 创建 Gradio 界面
demo = gr.Blocks(css='assets/app.css')
with demo:
uuid_str = gr.Textbox(label='modelscope_uuid', visible=False)
draw_seed = random.randint(0, 1000000000)
state = gr.State({'session_seed': draw_seed})
i18n = I18n('zh-cn')
with gr.Row():
with gr.Column(scale=5):
header = gr.Markdown(i18n.get('header'))
with gr.Column(scale=1):
language = gr.Dropdown(
choices=[('中文', 'zh-cn'), ('English', 'en')],
show_label=False,
container=False,
value='zh-cn',
interactive=True)
with gr.Row():
with gr.Column():
with gr.Tabs() as tabs:
with gr.Tab(i18n.get_whole('create'), id=0) as create_tab:
with gr.Column():
# "Create" 标签页的 Chatbot 组件
start_text = '欢迎使用agent创建助手。我可以帮助您创建一个定制agent。'\
'您希望您的agent主要用于什么领域或任务?比如,您可以说,我想做一个RPG游戏agent'
create_chatbot = gr.Chatbot(
show_label=False, value=[[None, start_text]])
create_chat_input = gr.Textbox(
label=i18n.get('message'),
placeholder=i18n.get('message_placeholder'))
create_send_button = gr.Button(
i18n.get('sendOnLoading'), interactive=False)
configure_tab = gr.Tab(i18n.get_whole('configure'), id=1)
with configure_tab:
with gr.Column():
# "Configure" 标签页的配置输入字段
with gr.Row():
bot_avatar_comp = gr.Image(
label=i18n.get('form_avatar'),
placeholder='Chatbot avatar image',
source='upload',
interactive=True,
type='filepath',
scale=1,
width=182,
height=182,
)
with gr.Column(scale=4):
name_input = gr.Textbox(
label=i18n.get('form_name'),
placeholder=i18n.get(
'form_name_placeholder'))
description_input = gr.Textbox(
label=i18n.get('form_description'),
placeholder=i18n.get(
'form_description_placeholder'))
instructions_input = gr.Textbox(
label=i18n.get('form_instructions'),
placeholder=i18n.get(
'form_instructions_placeholder'),
lines=3)
model_selector = model_selector = gr.Dropdown(
label=i18n.get('form_model'))
suggestion_input = gr.Dataframe(
show_label=False,
value=[['']],
datatype=['str'],
headers=[i18n.get_whole('form_prompt_suggestion')],
type='array',
col_count=(1, 'fixed'),
interactive=True)
knowledge_input = gr.File(
label=i18n.get('form_knowledge'),
file_count='multiple',
file_types=['text', '.json', '.csv', '.pdf'])
capabilities_checkboxes = gr.CheckboxGroup(
label=i18n.get('form_capabilities'))
with gr.Accordion(
i18n.get('open_api_accordion'),
open=False) as open_api_accordion:
openapi_schema = gr.Textbox(
label='Schema',
placeholder=
'Enter your OpenAPI schema here, JSON or YAML format only'
)
with gr.Group():
openapi_auth_type = gr.Radio(
label='Authentication Type',
choices=['None', 'API Key'],
value='None')
openapi_auth_apikey = gr.Textbox(
label='API Key',
placeholder='Enter your API Key here')
openapi_auth_apikey_type = gr.Radio(
label='API Key type', choices=['Bearer'])
openapi_privacy_policy = gr.Textbox(
label='Privacy Policy',
placeholder='Enter privacy policy URL')
configure_button = gr.Button(
i18n.get('form_update_button'))
with gr.Column():
# Preview
preview_header = gr.HTML(
f"""<div class="preview_header">{i18n.get('preview')}<div>""")
user_chat_bot_cover = gr.HTML(format_cover_html({}, None))
user_chatbot = ChatBot(
value=[[None, None]],
elem_id='user_chatbot',
elem_classes=['markdown-body'],
avatar_images=get_avatar_image('', uuid_str),
height=650,
latex_delimiters=[],
show_label=False,
visible=False)
preview_chat_input = gr.Textbox(
label=i18n.get('message'),
placeholder=i18n.get('message_placeholder'))
user_chat_bot_suggest = gr.Dataset(
label=i18n.get('prompt_suggestion'),
components=[preview_chat_input],
samples=[])
# preview_send_button = gr.Button('Send')
with gr.Row():
upload_button = gr.UploadButton(
i18n.get('upload_btn'),
file_types=[
'.csv', '.doc', '.docx', '.xls', '.xlsx', '.txt',
'.md', '.pdf', '.jpeg', '.png', '.jpg', '.gif'
],
file_count='multiple')
preview_send_button = gr.Button(
i18n.get('sendOnLoading'), interactive=False)
user_chat_bot_suggest.select(
lambda evt: evt[0],
inputs=[user_chat_bot_suggest],
outputs=[preview_chat_input])
with gr.Accordion(
label=i18n.get('publish'),
open=False) as publish_accordion:
with gr.Row():
with gr.Column():
publish_button = gr.Button(i18n.get_whole('build'))
gr.Markdown(f'#### 1.{i18n.get_whole("build_hint")}')
with gr.Column():
publish_link = gr.HTML(
value=format_goto_publish_html(
i18n.get_whole('publish'), '', {}, True))
gr.Markdown(f'#### 2.{i18n.get_whole("publish_hint")}')
configure_updated_outputs = [
state,
# config form
bot_avatar_comp,
name_input,
description_input,
instructions_input,
model_selector,
suggestion_input,
knowledge_input,
capabilities_checkboxes,
# bot
user_chat_bot_cover,
user_chat_bot_suggest,
preview_send_button,
create_send_button,
]
# 初始化表单
def init_ui_config(uuid_str, _state, builder_cfg, model_cfg, tool_cfg):
print('builder_cfg:', builder_cfg)
# available models
models = list(model_cfg.keys())
capabilities = [(tool_cfg[tool_key]['name'], tool_key)
for tool_key in tool_cfg.keys()
if tool_cfg[tool_key].get('is_active', False)]
_state['model_cfg'] = model_cfg
_state['tool_cfg'] = tool_cfg
_state['capabilities'] = capabilities
bot_avatar = get_avatar_image(builder_cfg.get('avatar', ''),
uuid_str)[1]
suggests = builder_cfg.get('prompt_recommend', [])
return {
state:
_state,
bot_avatar_comp:
gr.Image.update(value=bot_avatar),
name_input:
builder_cfg.get('name', ''),
description_input:
builder_cfg.get('description'),
instructions_input:
builder_cfg.get('instruction'),
model_selector:
gr.Dropdown.update(
value=builder_cfg.get('model', models[0]), choices=models),
suggestion_input: [[str] for str in suggests],
knowledge_input:
builder_cfg.get('knowledge', [])
if len(builder_cfg['knowledge']) > 0 else None,
capabilities_checkboxes:
gr.CheckboxGroup.update(
value=[
tool for tool in builder_cfg.get('tools', {}).keys()
if builder_cfg.get('tools').get(tool).get('use', False)
],
choices=capabilities),
# bot
user_chat_bot_cover:
format_cover_html(builder_cfg, bot_avatar),
user_chat_bot_suggest:
gr.Dataset.update(samples=[[item] for item in suggests]),
}
# tab 切换的事件处理
def on_congifure_tab_select(_state, uuid_str):
uuid_str = check_uuid(uuid_str)
configure_updated = _state.get('configure_updated', False)
if configure_updated:
builder_cfg, model_cfg, tool_cfg, available_tool_list, _, _ = parse_configuration(
uuid_str)
_state['configure_updated'] = False
return init_ui_config(uuid_str, _state, builder_cfg, model_cfg,
tool_cfg)
else:
return {state: _state}
configure_tab.select(
on_congifure_tab_select,
inputs=[state, uuid_str],
outputs=configure_updated_outputs)
# 配置 "Create" 标签页的消息发送功能
def format_message_with_builder_cfg(_state, chatbot, builder_cfg,
uuid_str):
uuid_str = check_uuid(uuid_str)
bot_avatar = builder_cfg.get('avatar', '')
prompt_recommend = builder_cfg.get('prompt_recommend', [])
suggestion = [[row] for row in prompt_recommend]
bot_avatar_path = get_avatar_image(bot_avatar, uuid_str)[1]
save_builder_configuration(builder_cfg, uuid_str)
_state['configure_updated'] = True
return {
create_chatbot:
chatbot,
user_chat_bot_cover:
gr.HTML.update(
visible=True,
value=format_cover_html(builder_cfg, bot_avatar_path)),
user_chatbot:
gr.Chatbot.update(
visible=False,
avatar_images=get_avatar_image(bot_avatar, uuid_str)),
user_chat_bot_suggest:
gr.Dataset.update(samples=suggestion)
}
def create_send_message(chatbot, input, _state, uuid_str):
uuid_str = check_uuid(uuid_str)
# 将发送的消息添加到聊天历史
builder_agent = _state['builder_agent']
chatbot.append((input, ''))
yield {
create_chatbot: chatbot,
create_chat_input: gr.Textbox.update(value=''),
}
response = ''
for frame in builder_agent.stream_run(
input, print_info=True, uuid_str=uuid_str):
llm_result = frame.get('llm_text', '')
exec_result = frame.get('exec_result', '')
step_result = frame.get('step', '')
print(frame)
if len(exec_result) != 0:
if isinstance(exec_result, dict):
exec_result = exec_result['result']
assert isinstance(exec_result, Config)
yield format_message_with_builder_cfg(
_state,
chatbot,
exec_result.to_dict(),
uuid_str=uuid_str)
else:
# llm result
if isinstance(llm_result, dict):
content = llm_result['content']
else:
content = llm_result
frame_text = content
response = beauty_output(f'{response}{frame_text}',
step_result)
chatbot[-1] = (input, response)
yield {
create_chatbot: chatbot,
}
create_send_button.click(
create_send_message,
inputs=[create_chatbot, create_chat_input, state, uuid_str],
outputs=[
create_chatbot, user_chat_bot_cover, user_chatbot,
user_chat_bot_suggest, create_chat_input
])
# 配置 "Configure" 标签页的提交按钮功能
configure_button.click(
process_configuration,
inputs=[
uuid_str, bot_avatar_comp, name_input, description_input,
instructions_input, model_selector, suggestion_input,
knowledge_input, capabilities_checkboxes, openapi_schema,
openapi_auth_type, openapi_auth_apikey, openapi_auth_apikey_type,
openapi_privacy_policy, state
],
outputs=[
user_chat_bot_cover, user_chatbot, user_chat_bot_suggest,
suggestion_input
])
# 配置 "Preview" 的消息发送功能
def preview_send_message(chatbot, input, _state):
# 将发送的消息添加到聊天历史
user_agent = _state['user_agent']
if 'new_file_paths' in _state:
new_file_paths = _state['new_file_paths']
else:
new_file_paths = []
_state['new_file_paths'] = []
chatbot.append((input, ''))
yield {
user_chatbot: gr.Chatbot.update(visible=True, value=chatbot),
user_chat_bot_cover: gr.HTML.update(visible=False),
preview_chat_input: gr.Textbox.update(value='')
}
response = ''
try:
for frame in user_agent.stream_run(
input,
print_info=True,
remote=False,
append_files=new_file_paths):
llm_result = frame.get('llm_text', '')
exec_result = frame.get('exec_result', '')
if len(exec_result) != 0:
# action_exec_result
if isinstance(exec_result, dict):
exec_result = str(exec_result['result'])
frame_text = f'<result>{exec_result}</result>'
else:
# llm result
frame_text = llm_result
# important! do not change this
response += frame_text
chatbot[-1] = (input, response)
yield {user_chatbot: chatbot}
except Exception as e:
if 'dashscope.common.error.AuthenticationError' in str(e):
msg = 'DASHSCOPE_API_KEY should be set via environment variable. You can acquire this in ' \
'https://help.aliyun.com/zh/dashscope/developer-reference/activate-dashscope-and-create-an-api-key'
else:
msg = str(e)
chatbot[-1] = (input, msg)
yield {user_chatbot: chatbot}
preview_send_button.click(
preview_send_message,
inputs=[user_chatbot, preview_chat_input, state],
outputs=[user_chatbot, user_chat_bot_cover, preview_chat_input])
def upload_file(chatbot, upload_button, _state, uuid_str):
uuid_str = check_uuid(uuid_str)
new_file_paths = []
if 'file_paths' in _state:
file_paths = _state['file_paths']
else:
file_paths = []
for file in upload_button:
file_name = os.path.basename(file.name)
# covert xxx.json to xxx_uuid_str.json
file_name = file_name.replace('.', f'_{uuid_str}.')
file_path = os.path.join(get_ci_dir(), file_name)
if not os.path.exists(file_path):
# make sure file path's directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
shutil.copy(file.name, file_path)
file_paths.append(file_path)
new_file_paths.append(file_path)
chatbot.append((None, f'上传文件{file_name},成功'))
yield {
user_chatbot: gr.Chatbot.update(visible=True, value=chatbot),
user_chat_bot_cover: gr.HTML.update(visible=False),
preview_chat_input: gr.Textbox.update(value='')
}
_state['file_paths'] = file_paths
_state['new_file_paths'] = new_file_paths
upload_button.upload(
upload_file,
inputs=[user_chatbot, upload_button, state, uuid_str],
outputs=[user_chatbot, user_chat_bot_cover, preview_chat_input])
# configuration for publish
def publish_agent(name, uuid_str, state):
uuid_str = check_uuid(uuid_str)
user_info = pop_user_info_from_config(DEFAULT_AGENT_DIR, uuid_str)
output_url = prepare_agent_zip(name, DEFAULT_AGENT_DIR, uuid_str,
state)
# output_url = "https://test.url"
return format_goto_publish_html(
i18n.get_whole('publish'), output_url, user_info)
publish_button.click(
publish_agent,
inputs=[name_input, uuid_str, state],
outputs=[publish_link],
)
def change_lang(language):
i18n = I18n(language)
return {
bot_avatar_comp:
gr.Image(label=i18n.get('form_avatar')),
name_input:
gr.Textbox(
label=i18n.get('form_name'),
placeholder=i18n.get('form_name_placeholder')),
description_input:
gr.Textbox(
label=i18n.get('form_description'),
placeholder=i18n.get('form_description_placeholder')),
instructions_input:
gr.Textbox(
label=i18n.get('form_instructions'),
placeholder=i18n.get('form_instructions_placeholder')),
model_selector:
gr.Dropdown(label=i18n.get('form_model')),
knowledge_input:
gr.File(label=i18n.get('form_knowledge')),
capabilities_checkboxes:
gr.CheckboxGroup(label=i18n.get('form_capabilities')),
open_api_accordion:
gr.Accordion(label=i18n.get('open_api_accordion')),
configure_button:
gr.Button(i18n.get('form_update_button')),
preview_header:
gr.HTML(
f"""<div class="preview_header">{i18n.get('preview')}<div>"""),
preview_send_button:
gr.Button.update(value=i18n.get('send')),
create_chat_input:
gr.Textbox(
label=i18n.get('message'),
placeholder=i18n.get('message_placeholder')),
create_send_button:
gr.Button.update(value=i18n.get('send')),
user_chat_bot_suggest:
gr.Dataset(label=i18n.get('prompt_suggestion')),
preview_chat_input:
gr.Textbox(
label=i18n.get('message'),
placeholder=i18n.get('message_placeholder')),
publish_accordion:
gr.Accordion(label=i18n.get('publish')),
upload_button:
gr.UploadButton(i18n.get('upload_btn')),
header:
gr.Markdown(i18n.get('header')),
}
language.select(
change_lang,
inputs=[language],
outputs=configure_updated_outputs + [
configure_button, create_chat_input, open_api_accordion,
preview_header, preview_chat_input, publish_accordion,
upload_button, header
])
def init_all(uuid_str, _state):
uuid_str = check_uuid(uuid_str)
builder_cfg, model_cfg, tool_cfg, available_tool_list, _, _ = parse_configuration(
uuid_str)
ret = init_ui_config(uuid_str, _state, builder_cfg, model_cfg,
tool_cfg)
yield ret
init_user(uuid_str, _state)
init_builder(uuid_str, _state)
yield {
state:
_state,
preview_send_button:
gr.Button.update(value=i18n.get('send'), interactive=True),
create_send_button:
gr.Button.update(value=i18n.get('send'), interactive=True),
}
demo.load(
init_all, inputs=[uuid_str, state], outputs=configure_updated_outputs)
demo.queue(concurrency_count=10)
demo.launch(share=True, show_error=True)