openfree's picture
Update app.py
a931219 verified
raw
history blame
22.6 kB
import os
import re
import random
from http import HTTPStatus
from typing import Dict, List, Optional, Tuple
import base64
import anthropic
import openai
import asyncio
import time
from functools import partial
import json
import gradio as gr
import modelscope_studio.components.base as ms
import modelscope_studio.components.legacy as legacy
import modelscope_studio.components.antd as antd
import html
import urllib.parse
from huggingface_hub import HfApi, create_repo
import string
import requests
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException, TimeoutException
from PIL import Image
from io import BytesIO
from datetime import datetime
# SystemPrompt ๋ถ€๋ถ„์„ ์ง์ ‘ ์ •์˜
SystemPrompt = """You are 'MOUSE-I', an advanced AI visualization expert. Your mission is to transform every response into a visually stunning and highly informative presentation.
Core Capabilities:
- Transform text responses into rich visual experiences
- Create interactive data visualizations and charts
- Design beautiful and intuitive user interfaces
- Utilize engaging animations and transitions
- Present information in a clear, structured manner
Visual Elements to Include:
- Charts & Graphs (using Chart.js, D3.js)
- Interactive Data Visualizations
- Modern UI Components
- Engaging Animations
- Informative Icons & Emojis
- Color-coded Information Blocks
- Progress Indicators
- Timeline Visualizations
- Statistical Representations
- Comparison Tables
Technical Requirements:
- Modern HTML5/CSS3/JavaScript
- Responsive Design
- Interactive Elements
- Clean Typography
- Professional Color Schemes
- Smooth Animations
- Cross-browser Compatibility
Libraries Available:
- Chart.js for Data Visualization
- D3.js for Complex Graphics
- Bootstrap for Layout
- jQuery for Interactions
- Three.js for 3D Elements
Design Principles:
- Visual Hierarchy
- Clear Information Flow
- Consistent Styling
- Intuitive Navigation
- Engaging User Experience
- Accessibility Compliance
Remember to:
- Present data in the most visually appealing way
- Use appropriate charts for different data types
- Include interactive elements where relevant
- Maintain a professional and modern aesthetic
- Ensure responsive design for all devices
Return only HTML code wrapped in code blocks, focusing on creating visually stunning and informative presentations.
"""
from config import DEMO_LIST
class Role:
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
History = List[Tuple[str, str]]
Messages = List[Dict[str, str]]
# ์ด๋ฏธ์ง€ ์บ์‹œ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ์ €์žฅ
IMAGE_CACHE = {}
# boost_prompt ํ•จ์ˆ˜์™€ handle_boost ํ•จ์ˆ˜๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค
def boost_prompt(prompt: str) -> str:
if not prompt:
return ""
# ์ฆ๊ฐ•์„ ์œ„ํ•œ ์‹œ์Šคํ…œ ํ”„๋กฌํ”„ํŠธ
boost_system_prompt = """
๋‹น์‹ ์€ ์›น ๊ฐœ๋ฐœ ํ”„๋กฌํ”„ํŠธ ์ „๋ฌธ๊ฐ€์ž…๋‹ˆ๋‹ค.
์ฃผ์–ด์ง„ ํ”„๋กฌํ”„ํŠธ๋ฅผ ๋ถ„์„ํ•˜์—ฌ ๋” ์ƒ์„ธํ•˜๊ณ  ์ „๋ฌธ์ ์ธ ์š”๊ตฌ์‚ฌํ•ญ์œผ๋กœ ํ™•์žฅํ•˜๋˜,
์›๋ž˜ ์˜๋„์™€ ๋ชฉ์ ์€ ๊ทธ๋Œ€๋กœ ์œ ์ง€ํ•˜๋ฉด์„œ ๋‹ค์Œ ๊ด€์ ๋“ค์„ ๊ณ ๋ คํ•˜์—ฌ ์ฆ๊ฐ•ํ•˜์‹ญ์‹œ์˜ค:
1. ๊ธฐ์ˆ ์  ๊ตฌํ˜„ ์ƒ์„ธ
2. UI/UX ๋””์ž์ธ ์š”์†Œ
3. ์‚ฌ์šฉ์ž ๊ฒฝํ—˜ ์ตœ์ ํ™”
4. ์„ฑ๋Šฅ๊ณผ ๋ณด์•ˆ
5. ์ ‘๊ทผ์„ฑ๊ณผ ํ˜ธํ™˜์„ฑ
๊ธฐ์กด SystemPrompt์˜ ๋ชจ๋“  ๊ทœ์น™์„ ์ค€์ˆ˜ํ•˜๋ฉด์„œ ์ฆ๊ฐ•๋œ ํ”„๋กฌํ”„ํŠธ๋ฅผ ์ƒ์„ฑํ•˜์‹ญ์‹œ์˜ค.
"""
try:
# Claude API ์‹œ๋„
try:
response = claude_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
messages=[{
"role": "user",
"content": f"๋‹ค์Œ ํ”„๋กฌํ”„ํŠธ๋ฅผ ๋ถ„์„ํ•˜๊ณ  ์ฆ๊ฐ•ํ•˜์‹œ์˜ค: {prompt}"
}]
)
if hasattr(response, 'content') and len(response.content) > 0:
return response.content[0].text
raise Exception("Claude API ์‘๋‹ต ํ˜•์‹ ์˜ค๋ฅ˜")
except Exception as claude_error:
print(f"Claude API ์—๋Ÿฌ, OpenAI๋กœ ์ „ํ™˜: {str(claude_error)}")
# OpenAI API ์‹œ๋„
completion = openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": boost_system_prompt},
{"role": "user", "content": f"๋‹ค์Œ ํ”„๋กฌํ”„ํŠธ๋ฅผ ๋ถ„์„ํ•˜๊ณ  ์ฆ๊ฐ•ํ•˜์‹œ์˜ค: {prompt}"}
],
max_tokens=2000,
temperature=0.7
)
if completion.choices and len(completion.choices) > 0:
return completion.choices[0].message.content
raise Exception("OpenAI API ์‘๋‹ต ํ˜•์‹ ์˜ค๋ฅ˜")
except Exception as e:
print(f"ํ”„๋กฌํ”„ํŠธ ์ฆ๊ฐ• ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}")
return prompt # ์˜ค๋ฅ˜ ๋ฐœ์ƒ์‹œ ์›๋ณธ ํ”„๋กฌํ”„ํŠธ ๋ฐ˜ํ™˜
# Boost ๋ฒ„ํŠผ ์ด๋ฒคํŠธ ํ•ธ๋“ค๋Ÿฌ
def handle_boost(prompt: str):
try:
boosted_prompt = boost_prompt(prompt)
return boosted_prompt, gr.update(active_key="empty")
except Exception as e:
print(f"Boost ์ฒ˜๋ฆฌ ์ค‘ ์˜ค๋ฅ˜: {str(e)}")
return prompt, gr.update(active_key="empty")
def get_image_base64(image_path):
if image_path in IMAGE_CACHE:
return IMAGE_CACHE[image_path]
try:
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode()
IMAGE_CACHE[image_path] = encoded_string
return encoded_string
except:
return IMAGE_CACHE.get('default.png', '')
def history_to_messages(history: History, system: str) -> Messages:
messages = [{'role': Role.SYSTEM, 'content': system}]
for h in history:
messages.append({'role': Role.USER, 'content': h[0]})
messages.append({'role': Role.ASSISTANT, 'content': h[1]})
return messages
def messages_to_history(messages: Messages) -> History:
assert messages[0]['role'] == Role.SYSTEM
history = []
for q, r in zip(messages[1::2], messages[2::2]):
history.append([q['content'], r['content']])
return history
# API ํด๋ผ์ด์–ธํŠธ ์ดˆ๊ธฐํ™”
YOUR_ANTHROPIC_TOKEN = os.getenv('ANTHROPIC_API_KEY', '') # ๊ธฐ๋ณธ๊ฐ’ ์ถ”๊ฐ€
YOUR_OPENAI_TOKEN = os.getenv('OPENAI_API_KEY', '') # ๊ธฐ๋ณธ๊ฐ’ ์ถ”๊ฐ€
# API ํ‚ค ๊ฒ€์ฆ
if not YOUR_ANTHROPIC_TOKEN or not YOUR_OPENAI_TOKEN:
print("Warning: API keys not found in environment variables")
# API ํด๋ผ์ด์–ธํŠธ ์ดˆ๊ธฐํ™” ์‹œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ ์ถ”๊ฐ€
try:
claude_client = anthropic.Anthropic(api_key=YOUR_ANTHROPIC_TOKEN)
openai_client = openai.OpenAI(api_key=YOUR_OPENAI_TOKEN)
except Exception as e:
print(f"Error initializing API clients: {str(e)}")
claude_client = None
openai_client = None
# try_claude_api ํ•จ์ˆ˜ ์ˆ˜์ •
async def try_claude_api(system_message, claude_messages, timeout=15):
try:
start_time = time.time()
with claude_client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=7800,
system=system_message,
messages=claude_messages
) as stream:
collected_content = ""
for chunk in stream:
current_time = time.time()
if current_time - start_time > timeout:
print(f"Claude API response time: {current_time - start_time:.2f} seconds")
raise TimeoutError("Claude API timeout")
if chunk.type == "content_block_delta":
collected_content += chunk.delta.text
yield collected_content
await asyncio.sleep(0)
start_time = current_time
except Exception as e:
print(f"Claude API error: {str(e)}")
raise e
async def try_openai_api(openai_messages):
try:
stream = openai_client.chat.completions.create(
model="gpt-4o",
messages=openai_messages,
stream=True,
max_tokens=4096,
temperature=0.7
)
collected_content = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
collected_content += chunk.choices[0].delta.content
yield collected_content
except Exception as e:
print(f"OpenAI API error: {str(e)}")
raise e
class Demo:
def __init__(self):
pass
async def generation_code(self, query: Optional[str], _setting: Dict[str, str], _history: Optional[History]):
if not query or query.strip() == '':
query = random.choice(DEMO_LIST)['description']
if _history is None:
_history = []
messages = history_to_messages(_history, _setting['system'])
system_message = messages[0]['content']
claude_messages = [
{"role": msg["role"] if msg["role"] != "system" else "user", "content": msg["content"]}
for msg in messages[1:] + [{'role': Role.USER, 'content': query}]
if msg["content"].strip() != ''
]
openai_messages = [{"role": "system", "content": system_message}]
for msg in messages[1:]:
openai_messages.append({
"role": msg["role"],
"content": msg["content"]
})
openai_messages.append({"role": "user", "content": query})
try:
yield [
"Generating code...",
_history,
None,
gr.update(active_key="loading"),
gr.update(open=True)
]
await asyncio.sleep(0)
collected_content = None
try:
async for content in try_claude_api(system_message, claude_messages):
yield [
content,
_history,
None,
gr.update(active_key="loading"),
gr.update(open=True)
]
await asyncio.sleep(0)
collected_content = content
except Exception as claude_error:
print(f"Falling back to OpenAI API due to Claude error: {str(claude_error)}")
async for content in try_openai_api(openai_messages):
yield [
content,
_history,
None,
gr.update(active_key="loading"),
gr.update(open=True)
]
await asyncio.sleep(0)
collected_content = content
if collected_content:
_history = messages_to_history([
{'role': Role.SYSTEM, 'content': system_message}
] + claude_messages + [{
'role': Role.ASSISTANT,
'content': collected_content
}])
# code_drawer๋ฅผ ๋‹ซ๋„๋ก ์ˆ˜์ •
yield [
collected_content,
_history,
send_to_sandbox(remove_code_block(collected_content)),
gr.update(active_key="render"),
gr.update(open=False) # code_drawer๋ฅผ ๋‹ซ์Œ
]
else:
raise ValueError("No content was generated from either API")
except Exception as e:
print(f"Error details: {str(e)}")
raise ValueError(f'Error calling APIs: {str(e)}')
def clear_history(self):
return []
def remove_code_block(text):
pattern = r'```html\n(.+?)\n```'
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(1).strip()
else:
return text.strip()
def history_render(history: History):
return gr.update(open=True), history
def send_to_sandbox(code):
encoded_html = base64.b64encode(code.encode('utf-8')).decode('utf-8')
data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}"
return f"""
<iframe
src="{data_uri}"
style="width:100%; height:800px; border:none;"
frameborder="0"
></iframe>
"""
# ๋ฐฐํฌ ๊ด€๋ จ ํ•จ์ˆ˜ ์ถ”๊ฐ€
def generate_space_name():
"""6์ž๋ฆฌ ๋žœ๋ค ์˜๋ฌธ ์ด๋ฆ„ ์ƒ์„ฑ"""
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(6))
def deploy_to_vercel(code: str):
try:
token = "A8IFZmgW2cqA4yUNlLPnci0N"
if not token:
return "Vercel ํ† ํฐ์ด ์„ค์ •๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค."
# 6์ž๋ฆฌ ์˜๋ฌธ ํ”„๋กœ์ ํŠธ ์ด๋ฆ„ ์ƒ์„ฑ
project_name = ''.join(random.choice(string.ascii_lowercase) for i in range(6))
# Vercel API ์—”๋“œํฌ์ธํŠธ
deploy_url = "https://api.vercel.com/v13/deployments"
# ํ—ค๋” ์„ค์ •
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# package.json ํŒŒ์ผ ์ƒ์„ฑ
package_json = {
"name": project_name,
"version": "1.0.0",
"private": True, # true -> True๋กœ ์ˆ˜์ •
"dependencies": {
"vite": "^5.0.0"
},
"scripts": {
"dev": "vite",
"build": "echo 'No build needed' && mkdir -p dist && cp index.html dist/",
"preview": "vite preview"
}
}
# ๋ฐฐํฌํ•  ํŒŒ์ผ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ
files = [
{
"file": "index.html",
"data": code
},
{
"file": "package.json",
"data": json.dumps(package_json, indent=2) # indent ์ถ”๊ฐ€๋กœ ๊ฐ€๋…์„ฑ ํ–ฅ์ƒ
}
]
# ํ”„๋กœ์ ํŠธ ์„ค์ •
project_settings = {
"buildCommand": "npm run build",
"outputDirectory": "dist",
"installCommand": "npm install",
"framework": None
}
# ๋ฐฐํฌ ์š”์ฒญ ๋ฐ์ดํ„ฐ
deploy_data = {
"name": project_name,
"files": files,
"target": "production",
"projectSettings": project_settings
}
deploy_response = requests.post(deploy_url, headers=headers, json=deploy_data)
if deploy_response.status_code != 200:
return f"๋ฐฐํฌ ์‹คํŒจ: {deploy_response.text}"
# URL ํ˜•์‹ ์ˆ˜์ • - 6์ž๋ฆฌ.vercel.app ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜
deployment_url = f"{project_name}.vercel.app"
time.sleep(5)
return f"""๋ฐฐํฌ ์™„๋ฃŒ! <a href="https://{deployment_url}" target="_blank" style="color: #1890ff; text-decoration: underline; cursor: pointer;">https://{deployment_url}</a>"""
except Exception as e:
return f"๋ฐฐํฌ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}"
theme = gr.themes.Soft()
def create_main_interface():
"""๋ฉ”์ธ ์ธํ„ฐํŽ˜์ด์Šค ์ƒ์„ฑ ํ•จ์ˆ˜"""
def execute_code(query: str):
if not query or query.strip() == '':
return None, gr.update(active_key="empty")
try:
if '```html' in query and '```' in query:
code = remove_code_block(query)
else:
code = query.strip()
return send_to_sandbox(code), gr.update(active_key="render")
except Exception as e:
print(f"Error executing code: {str(e)}")
return None, gr.update(active_key="empty")
# CSS ํŒŒ์ผ ๋‚ด์šฉ์„ ์ง์ ‘ ์ ์šฉ
with open('app.css', 'r', encoding='utf-8') as f:
custom_css = f.read()
demo = gr.Blocks(css=custom_css, theme=theme)
with demo:
with gr.Tabs(elem_classes="main-tabs") as tabs:
# MOUSE ํƒญ
with gr.Tab("Visual AI Assistant", elem_id="mouse-tab", elem_classes="mouse-tab"):
history = gr.State([])
setting = gr.State({
"system": SystemPrompt,
})
with ms.Application() as app:
with antd.ConfigProvider():
# Drawer ์ปดํฌ๋„ŒํŠธ๋“ค
with antd.Drawer(open=False, title="Thinking", placement="left", width="750px") as code_drawer:
code_output = legacy.Markdown()
with antd.Drawer(open=False, title="history", placement="left", width="900px") as history_drawer:
history_output = legacy.Chatbot(show_label=False, flushing=False, height=960, elem_classes="history_chatbot")
# ๋ฉ”์ธ ์ปจํ…์ธ ๋ฅผ ์œ„ํ•œ Row
with antd.Row(gutter=[32, 12]) as layout:
# ์ขŒ์ธก ํŒจ๋„
with antd.Col(span=24, md=8):
with antd.Flex(vertical=True, gap="middle", wrap=True):
# ํ—ค๋” ๋ถ€๋ถ„
header = gr.HTML(f"""
<div class="left_header">
<img src="data:image/gif;base64,{get_image_base64('mouse.gif')}" width="360px" />
<h1 style="font-size: 18px;">MOUSE-Chat: Visual AI Assistant</h1>
<h1 style="font-size: 10px;">Transform your questions into stunning visual presentations. Every response is crafted with beautiful graphics, charts, and interactive elements.</h1>
</div>
""")
# ์ž…๋ ฅ ์˜์—ญ
input = antd.InputTextarea(
size="large",
allow_clear=True,
placeholder=random.choice(DEMO_LIST)['description']
)
# ๋ฒ„ํŠผ ๊ทธ๋ฃน
with antd.Flex(gap="small", justify="space-between"):
btn = antd.Button("Generate", type="primary", size="large")
boost_btn = antd.Button("Enhance", type="default", size="large")
deploy_btn = antd.Button("Share", type="default", size="large")
clear_btn = antd.Button("Clear", type="default", size="large")
historyBtn = antd.Button("๐Ÿ“œ History", type="default")
deploy_result = gr.HTML(label="๋ฐฐํฌ ๊ฒฐ๊ณผ")
# ์šฐ์ธก ํŒจ๋„
with antd.Col(span=24, md=16):
with ms.Div(elem_classes="right_panel"):
# ์ƒ๋‹จ ๋ฒ„ํŠผ๋“ค
with antd.Flex(gap="small", elem_classes="setting-buttons"):
historyBtn = antd.Button("๐Ÿ“œ ํžˆ์Šคํ† ๋ฆฌ", type="default")
gr.HTML('<div class="render_header"><span class="header_btn"></span><span class="header_btn"></span><span class="header_btn"></span></div>')
# ํƒญ ์ปจํ…์ธ 
with antd.Tabs(active_key="empty", render_tab_bar="() => null") as state_tab:
with antd.Tabs.Item(key="empty"):
empty = antd.Empty(description="Enter your question to begin", elem_classes="right_content")
with antd.Tabs.Item(key="loading"):
loading = antd.Spin(True, tip="Creating visual presentation...", size="large", elem_classes="right_content")
with antd.Tabs.Item(key="render"):
sandbox = gr.HTML(elem_classes="html_content")
# ์ด๋ฒคํŠธ ํ•ธ๋“ค๋Ÿฌ ์—ฐ๊ฒฐ
historyBtn.click(
history_render,
inputs=[history],
outputs=[history_drawer, history_output]
)
history_drawer.close(
lambda: gr.update(open=False),
inputs=[],
outputs=[history_drawer]
)
btn.click(
demo_instance.generation_code,
inputs=[input, setting, history],
outputs=[code_output, history, sandbox, state_tab, code_drawer]
)
clear_btn.click(
demo_instance.clear_history,
inputs=[],
outputs=[history]
)
boost_btn.click(
fn=handle_boost,
inputs=[input],
outputs=[input, state_tab]
)
deploy_btn.click(
fn=lambda code: deploy_to_vercel(remove_code_block(code)) if code else "์ฝ”๋“œ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.",
inputs=[code_output],
outputs=[deploy_result]
)
return demo
# ๋ฉ”์ธ ์‹คํ–‰ ๋ถ€๋ถ„
if __name__ == "__main__":
try:
demo_instance = Demo() # Demo ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
demo = create_main_interface() # ์ธํ„ฐํŽ˜์ด์Šค ์ƒ์„ฑ
demo.queue(default_concurrency_limit=20).launch(server_name="0.0.0.0", server_port=7860) # ์„œ๋ฒ„ ์„ค์ • ์ถ”๊ฐ€
except Exception as e:
print(f"Initialization error: {e}")
raise