Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import re | |
import random | |
from http import HTTPStatus | |
from typing import Dict, List, Optional, Tuple | |
import base64 | |
import anthropic | |
import openai | |
import asyncio | |
import time | |
from functools import partial | |
import json | |
import gradio as gr | |
import modelscope_studio.components.base as ms | |
import modelscope_studio.components.legacy as legacy | |
import modelscope_studio.components.antd as antd | |
import html | |
import urllib.parse | |
from huggingface_hub import HfApi, create_repo | |
import string | |
import requests | |
from selenium import webdriver | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
from selenium.common.exceptions import WebDriverException, TimeoutException | |
from PIL import Image | |
from io import BytesIO | |
from datetime import datetime | |
# SystemPrompt ๋ถ๋ถ์ ์ง์ ์ ์ | |
SystemPrompt = """You are 'MOUSE-I', an advanced AI visualization expert. Your mission is to transform every response into a visually stunning and highly informative presentation. | |
Core Capabilities: | |
- Transform text responses into rich visual experiences | |
- Create interactive data visualizations and charts | |
- Design beautiful and intuitive user interfaces | |
- Utilize engaging animations and transitions | |
- Present information in a clear, structured manner | |
Visual Elements to Include: | |
- Charts & Graphs (using Chart.js, D3.js) | |
- Interactive Data Visualizations | |
- Modern UI Components | |
- Engaging Animations | |
- Informative Icons & Emojis | |
- Color-coded Information Blocks | |
- Progress Indicators | |
- Timeline Visualizations | |
- Statistical Representations | |
- Comparison Tables | |
Technical Requirements: | |
- Modern HTML5/CSS3/JavaScript | |
- Responsive Design | |
- Interactive Elements | |
- Clean Typography | |
- Professional Color Schemes | |
- Smooth Animations | |
- Cross-browser Compatibility | |
Libraries Available: | |
- Chart.js for Data Visualization | |
- D3.js for Complex Graphics | |
- Bootstrap for Layout | |
- jQuery for Interactions | |
- Three.js for 3D Elements | |
Design Principles: | |
- Visual Hierarchy | |
- Clear Information Flow | |
- Consistent Styling | |
- Intuitive Navigation | |
- Engaging User Experience | |
- Accessibility Compliance | |
Remember to: | |
- Present data in the most visually appealing way | |
- Use appropriate charts for different data types | |
- Include interactive elements where relevant | |
- Maintain a professional and modern aesthetic | |
- Ensure responsive design for all devices | |
Return only HTML code wrapped in code blocks, focusing on creating visually stunning and informative presentations. | |
""" | |
from config import DEMO_LIST | |
class Role: | |
SYSTEM = "system" | |
USER = "user" | |
ASSISTANT = "assistant" | |
History = List[Tuple[str, str]] | |
Messages = List[Dict[str, str]] | |
# ์ด๋ฏธ์ง ์บ์๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅ | |
IMAGE_CACHE = {} | |
# boost_prompt ํจ์์ handle_boost ํจ์๋ฅผ ์ถ๊ฐํฉ๋๋ค | |
def boost_prompt(prompt: str) -> str: | |
if not prompt: | |
return "" | |
# ์ฆ๊ฐ์ ์ํ ์์คํ ํ๋กฌํํธ | |
boost_system_prompt = """ | |
๋น์ ์ ์น ๊ฐ๋ฐ ํ๋กฌํํธ ์ ๋ฌธ๊ฐ์ ๋๋ค. | |
์ฃผ์ด์ง ํ๋กฌํํธ๋ฅผ ๋ถ์ํ์ฌ ๋ ์์ธํ๊ณ ์ ๋ฌธ์ ์ธ ์๊ตฌ์ฌํญ์ผ๋ก ํ์ฅํ๋, | |
์๋ ์๋์ ๋ชฉ์ ์ ๊ทธ๋๋ก ์ ์งํ๋ฉด์ ๋ค์ ๊ด์ ๋ค์ ๊ณ ๋ คํ์ฌ ์ฆ๊ฐํ์ญ์์ค: | |
1. ๊ธฐ์ ์ ๊ตฌํ ์์ธ | |
2. UI/UX ๋์์ธ ์์ | |
3. ์ฌ์ฉ์ ๊ฒฝํ ์ต์ ํ | |
4. ์ฑ๋ฅ๊ณผ ๋ณด์ | |
5. ์ ๊ทผ์ฑ๊ณผ ํธํ์ฑ | |
๊ธฐ์กด SystemPrompt์ ๋ชจ๋ ๊ท์น์ ์ค์ํ๋ฉด์ ์ฆ๊ฐ๋ ํ๋กฌํํธ๋ฅผ ์์ฑํ์ญ์์ค. | |
""" | |
try: | |
# Claude API ์๋ | |
try: | |
response = claude_client.messages.create( | |
model="claude-3-5-sonnet-20241022", | |
max_tokens=2000, | |
messages=[{ | |
"role": "user", | |
"content": f"๋ค์ ํ๋กฌํํธ๋ฅผ ๋ถ์ํ๊ณ ์ฆ๊ฐํ์์ค: {prompt}" | |
}] | |
) | |
if hasattr(response, 'content') and len(response.content) > 0: | |
return response.content[0].text | |
raise Exception("Claude API ์๋ต ํ์ ์ค๋ฅ") | |
except Exception as claude_error: | |
print(f"Claude API ์๋ฌ, OpenAI๋ก ์ ํ: {str(claude_error)}") | |
# OpenAI API ์๋ | |
completion = openai_client.chat.completions.create( | |
model="gpt-4", | |
messages=[ | |
{"role": "system", "content": boost_system_prompt}, | |
{"role": "user", "content": f"๋ค์ ํ๋กฌํํธ๋ฅผ ๋ถ์ํ๊ณ ์ฆ๊ฐํ์์ค: {prompt}"} | |
], | |
max_tokens=2000, | |
temperature=0.7 | |
) | |
if completion.choices and len(completion.choices) > 0: | |
return completion.choices[0].message.content | |
raise Exception("OpenAI API ์๋ต ํ์ ์ค๋ฅ") | |
except Exception as e: | |
print(f"ํ๋กฌํํธ ์ฆ๊ฐ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}") | |
return prompt # ์ค๋ฅ ๋ฐ์์ ์๋ณธ ํ๋กฌํํธ ๋ฐํ | |
# Boost ๋ฒํผ ์ด๋ฒคํธ ํธ๋ค๋ฌ | |
def handle_boost(prompt: str): | |
try: | |
boosted_prompt = boost_prompt(prompt) | |
return boosted_prompt, gr.update(active_key="empty") | |
except Exception as e: | |
print(f"Boost ์ฒ๋ฆฌ ์ค ์ค๋ฅ: {str(e)}") | |
return prompt, gr.update(active_key="empty") | |
def get_image_base64(image_path): | |
if image_path in IMAGE_CACHE: | |
return IMAGE_CACHE[image_path] | |
try: | |
with open(image_path, "rb") as image_file: | |
encoded_string = base64.b64encode(image_file.read()).decode() | |
IMAGE_CACHE[image_path] = encoded_string | |
return encoded_string | |
except: | |
return IMAGE_CACHE.get('default.png', '') | |
def history_to_messages(history: History, system: str) -> Messages: | |
messages = [{'role': Role.SYSTEM, 'content': system}] | |
for h in history: | |
messages.append({'role': Role.USER, 'content': h[0]}) | |
messages.append({'role': Role.ASSISTANT, 'content': h[1]}) | |
return messages | |
def messages_to_history(messages: Messages) -> History: | |
assert messages[0]['role'] == Role.SYSTEM | |
history = [] | |
for q, r in zip(messages[1::2], messages[2::2]): | |
history.append([q['content'], r['content']]) | |
return history | |
# API ํด๋ผ์ด์ธํธ ์ด๊ธฐํ | |
YOUR_ANTHROPIC_TOKEN = os.getenv('ANTHROPIC_API_KEY', '') # ๊ธฐ๋ณธ๊ฐ ์ถ๊ฐ | |
YOUR_OPENAI_TOKEN = os.getenv('OPENAI_API_KEY', '') # ๊ธฐ๋ณธ๊ฐ ์ถ๊ฐ | |
# API ํค ๊ฒ์ฆ | |
if not YOUR_ANTHROPIC_TOKEN or not YOUR_OPENAI_TOKEN: | |
print("Warning: API keys not found in environment variables") | |
# API ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ์ ์์ธ ์ฒ๋ฆฌ ์ถ๊ฐ | |
try: | |
claude_client = anthropic.Anthropic(api_key=YOUR_ANTHROPIC_TOKEN) | |
openai_client = openai.OpenAI(api_key=YOUR_OPENAI_TOKEN) | |
except Exception as e: | |
print(f"Error initializing API clients: {str(e)}") | |
claude_client = None | |
openai_client = None | |
# try_claude_api ํจ์ ์์ | |
async def try_claude_api(system_message, claude_messages, timeout=15): | |
try: | |
start_time = time.time() | |
with claude_client.messages.stream( | |
model="claude-3-5-sonnet-20241022", | |
max_tokens=7800, | |
system=system_message, | |
messages=claude_messages | |
) as stream: | |
collected_content = "" | |
for chunk in stream: | |
current_time = time.time() | |
if current_time - start_time > timeout: | |
print(f"Claude API response time: {current_time - start_time:.2f} seconds") | |
raise TimeoutError("Claude API timeout") | |
if chunk.type == "content_block_delta": | |
collected_content += chunk.delta.text | |
yield collected_content | |
await asyncio.sleep(0) | |
start_time = current_time | |
except Exception as e: | |
print(f"Claude API error: {str(e)}") | |
raise e | |
async def try_openai_api(openai_messages): | |
try: | |
stream = openai_client.chat.completions.create( | |
model="gpt-4o", | |
messages=openai_messages, | |
stream=True, | |
max_tokens=4096, | |
temperature=0.7 | |
) | |
collected_content = "" | |
for chunk in stream: | |
if chunk.choices[0].delta.content is not None: | |
collected_content += chunk.choices[0].delta.content | |
yield collected_content | |
except Exception as e: | |
print(f"OpenAI API error: {str(e)}") | |
raise e | |
class Demo: | |
def __init__(self): | |
pass | |
async def generation_code(self, query: Optional[str], _setting: Dict[str, str], _history: Optional[History]): | |
if not query or query.strip() == '': | |
query = random.choice(DEMO_LIST)['description'] | |
if _history is None: | |
_history = [] | |
messages = history_to_messages(_history, _setting['system']) | |
system_message = messages[0]['content'] | |
claude_messages = [ | |
{"role": msg["role"] if msg["role"] != "system" else "user", "content": msg["content"]} | |
for msg in messages[1:] + [{'role': Role.USER, 'content': query}] | |
if msg["content"].strip() != '' | |
] | |
openai_messages = [{"role": "system", "content": system_message}] | |
for msg in messages[1:]: | |
openai_messages.append({ | |
"role": msg["role"], | |
"content": msg["content"] | |
}) | |
openai_messages.append({"role": "user", "content": query}) | |
try: | |
yield [ | |
"Generating code...", | |
_history, | |
None, | |
gr.update(active_key="loading"), | |
gr.update(open=True) | |
] | |
await asyncio.sleep(0) | |
collected_content = None | |
try: | |
async for content in try_claude_api(system_message, claude_messages): | |
yield [ | |
content, | |
_history, | |
None, | |
gr.update(active_key="loading"), | |
gr.update(open=True) | |
] | |
await asyncio.sleep(0) | |
collected_content = content | |
except Exception as claude_error: | |
print(f"Falling back to OpenAI API due to Claude error: {str(claude_error)}") | |
async for content in try_openai_api(openai_messages): | |
yield [ | |
content, | |
_history, | |
None, | |
gr.update(active_key="loading"), | |
gr.update(open=True) | |
] | |
await asyncio.sleep(0) | |
collected_content = content | |
if collected_content: | |
_history = messages_to_history([ | |
{'role': Role.SYSTEM, 'content': system_message} | |
] + claude_messages + [{ | |
'role': Role.ASSISTANT, | |
'content': collected_content | |
}]) | |
# code_drawer๋ฅผ ๋ซ๋๋ก ์์ | |
yield [ | |
collected_content, | |
_history, | |
send_to_sandbox(remove_code_block(collected_content)), | |
gr.update(active_key="render"), | |
gr.update(open=False) # code_drawer๋ฅผ ๋ซ์ | |
] | |
else: | |
raise ValueError("No content was generated from either API") | |
except Exception as e: | |
print(f"Error details: {str(e)}") | |
raise ValueError(f'Error calling APIs: {str(e)}') | |
def clear_history(self): | |
return [] | |
def remove_code_block(text): | |
pattern = r'```html\n(.+?)\n```' | |
match = re.search(pattern, text, re.DOTALL) | |
if match: | |
return match.group(1).strip() | |
else: | |
return text.strip() | |
def history_render(history: History): | |
return gr.update(open=True), history | |
def send_to_sandbox(code): | |
encoded_html = base64.b64encode(code.encode('utf-8')).decode('utf-8') | |
data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}" | |
return f""" | |
<iframe | |
src="{data_uri}" | |
style="width:100%; height:800px; border:none;" | |
frameborder="0" | |
></iframe> | |
""" | |
# ๋ฐฐํฌ ๊ด๋ จ ํจ์ ์ถ๊ฐ | |
def generate_space_name(): | |
"""6์๋ฆฌ ๋๋ค ์๋ฌธ ์ด๋ฆ ์์ฑ""" | |
letters = string.ascii_lowercase | |
return ''.join(random.choice(letters) for i in range(6)) | |
def deploy_to_vercel(code: str): | |
try: | |
token = "A8IFZmgW2cqA4yUNlLPnci0N" | |
if not token: | |
return "Vercel ํ ํฐ์ด ์ค์ ๋์ง ์์์ต๋๋ค." | |
# 6์๋ฆฌ ์๋ฌธ ํ๋ก์ ํธ ์ด๋ฆ ์์ฑ | |
project_name = ''.join(random.choice(string.ascii_lowercase) for i in range(6)) | |
# Vercel API ์๋ํฌ์ธํธ | |
deploy_url = "https://api.vercel.com/v13/deployments" | |
# ํค๋ ์ค์ | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json" | |
} | |
# package.json ํ์ผ ์์ฑ | |
package_json = { | |
"name": project_name, | |
"version": "1.0.0", | |
"private": True, # true -> True๋ก ์์ | |
"dependencies": { | |
"vite": "^5.0.0" | |
}, | |
"scripts": { | |
"dev": "vite", | |
"build": "echo 'No build needed' && mkdir -p dist && cp index.html dist/", | |
"preview": "vite preview" | |
} | |
} | |
# ๋ฐฐํฌํ ํ์ผ ๋ฐ์ดํฐ ๊ตฌ์กฐ | |
files = [ | |
{ | |
"file": "index.html", | |
"data": code | |
}, | |
{ | |
"file": "package.json", | |
"data": json.dumps(package_json, indent=2) # indent ์ถ๊ฐ๋ก ๊ฐ๋ ์ฑ ํฅ์ | |
} | |
] | |
# ํ๋ก์ ํธ ์ค์ | |
project_settings = { | |
"buildCommand": "npm run build", | |
"outputDirectory": "dist", | |
"installCommand": "npm install", | |
"framework": None | |
} | |
# ๋ฐฐํฌ ์์ฒญ ๋ฐ์ดํฐ | |
deploy_data = { | |
"name": project_name, | |
"files": files, | |
"target": "production", | |
"projectSettings": project_settings | |
} | |
deploy_response = requests.post(deploy_url, headers=headers, json=deploy_data) | |
if deploy_response.status_code != 200: | |
return f"๋ฐฐํฌ ์คํจ: {deploy_response.text}" | |
# URL ํ์ ์์ - 6์๋ฆฌ.vercel.app ํํ๋ก ๋ฐํ | |
deployment_url = f"{project_name}.vercel.app" | |
time.sleep(5) | |
return f"""๋ฐฐํฌ ์๋ฃ! <a href="https://{deployment_url}" target="_blank" style="color: #1890ff; text-decoration: underline; cursor: pointer;">https://{deployment_url}</a>""" | |
except Exception as e: | |
return f"๋ฐฐํฌ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}" | |
theme = gr.themes.Soft() | |
def create_main_interface(): | |
"""๋ฉ์ธ ์ธํฐํ์ด์ค ์์ฑ ํจ์""" | |
def execute_code(query: str): | |
if not query or query.strip() == '': | |
return None, gr.update(active_key="empty") | |
try: | |
if '```html' in query and '```' in query: | |
code = remove_code_block(query) | |
else: | |
code = query.strip() | |
return send_to_sandbox(code), gr.update(active_key="render") | |
except Exception as e: | |
print(f"Error executing code: {str(e)}") | |
return None, gr.update(active_key="empty") | |
# CSS ํ์ผ ๋ด์ฉ์ ์ง์ ์ ์ฉ | |
with open('app.css', 'r', encoding='utf-8') as f: | |
custom_css = f.read() | |
demo = gr.Blocks(css=custom_css, theme=theme) | |
with demo: | |
with gr.Tabs(elem_classes="main-tabs") as tabs: | |
# MOUSE ํญ | |
with gr.Tab("Visual AI Assistant", elem_id="mouse-tab", elem_classes="mouse-tab"): | |
history = gr.State([]) | |
setting = gr.State({ | |
"system": SystemPrompt, | |
}) | |
with ms.Application() as app: | |
with antd.ConfigProvider(): | |
# Drawer ์ปดํฌ๋ํธ๋ค | |
with antd.Drawer(open=False, title="Thinking", placement="left", width="750px") as code_drawer: | |
code_output = legacy.Markdown() | |
with antd.Drawer(open=False, title="history", placement="left", width="900px") as history_drawer: | |
history_output = legacy.Chatbot(show_label=False, flushing=False, height=960, elem_classes="history_chatbot") | |
# ๋ฉ์ธ ์ปจํ ์ธ ๋ฅผ ์ํ Row | |
with antd.Row(gutter=[32, 12]) as layout: | |
# ์ข์ธก ํจ๋ | |
with antd.Col(span=24, md=8): | |
with antd.Flex(vertical=True, gap="middle", wrap=True): | |
# ํค๋ ๋ถ๋ถ | |
header = gr.HTML(f""" | |
<div class="left_header"> | |
<img src="data:image/gif;base64,{get_image_base64('mouse.gif')}" width="360px" /> | |
<h1 style="font-size: 18px;">MOUSE-Chat: Visual AI Assistant</h1> | |
<h1 style="font-size: 10px;">Transform your questions into stunning visual presentations. Every response is crafted with beautiful graphics, charts, and interactive elements.</h1> | |
</div> | |
""") | |
# ์ ๋ ฅ ์์ญ | |
input = antd.InputTextarea( | |
size="large", | |
allow_clear=True, | |
placeholder=random.choice(DEMO_LIST)['description'] | |
) | |
# ๋ฒํผ ๊ทธ๋ฃน | |
with antd.Flex(gap="small", justify="space-between"): | |
btn = antd.Button("Generate", type="primary", size="large") | |
boost_btn = antd.Button("Enhance", type="default", size="large") | |
deploy_btn = antd.Button("Share", type="default", size="large") | |
clear_btn = antd.Button("Clear", type="default", size="large") | |
historyBtn = antd.Button("๐ History", type="default") | |
deploy_result = gr.HTML(label="๋ฐฐํฌ ๊ฒฐ๊ณผ") | |
# ์ฐ์ธก ํจ๋ | |
with antd.Col(span=24, md=16): | |
with ms.Div(elem_classes="right_panel"): | |
# ์๋จ ๋ฒํผ๋ค | |
with antd.Flex(gap="small", elem_classes="setting-buttons"): | |
historyBtn = antd.Button("๐ ํ์คํ ๋ฆฌ", type="default") | |
gr.HTML('<div class="render_header"><span class="header_btn"></span><span class="header_btn"></span><span class="header_btn"></span></div>') | |
# ํญ ์ปจํ ์ธ | |
with antd.Tabs(active_key="empty", render_tab_bar="() => null") as state_tab: | |
with antd.Tabs.Item(key="empty"): | |
empty = antd.Empty(description="Enter your question to begin", elem_classes="right_content") | |
with antd.Tabs.Item(key="loading"): | |
loading = antd.Spin(True, tip="Creating visual presentation...", size="large", elem_classes="right_content") | |
with antd.Tabs.Item(key="render"): | |
sandbox = gr.HTML(elem_classes="html_content") | |
# ์ด๋ฒคํธ ํธ๋ค๋ฌ ์ฐ๊ฒฐ | |
historyBtn.click( | |
history_render, | |
inputs=[history], | |
outputs=[history_drawer, history_output] | |
) | |
history_drawer.close( | |
lambda: gr.update(open=False), | |
inputs=[], | |
outputs=[history_drawer] | |
) | |
btn.click( | |
demo_instance.generation_code, | |
inputs=[input, setting, history], | |
outputs=[code_output, history, sandbox, state_tab, code_drawer] | |
) | |
clear_btn.click( | |
demo_instance.clear_history, | |
inputs=[], | |
outputs=[history] | |
) | |
boost_btn.click( | |
fn=handle_boost, | |
inputs=[input], | |
outputs=[input, state_tab] | |
) | |
deploy_btn.click( | |
fn=lambda code: deploy_to_vercel(remove_code_block(code)) if code else "์ฝ๋๊ฐ ์์ต๋๋ค.", | |
inputs=[code_output], | |
outputs=[deploy_result] | |
) | |
return demo | |
# ๋ฉ์ธ ์คํ ๋ถ๋ถ | |
if __name__ == "__main__": | |
try: | |
demo_instance = Demo() # Demo ์ธ์คํด์ค ์์ฑ | |
demo = create_main_interface() # ์ธํฐํ์ด์ค ์์ฑ | |
demo.queue(default_concurrency_limit=20).launch(server_name="0.0.0.0", server_port=7860) # ์๋ฒ ์ค์ ์ถ๊ฐ | |
except Exception as e: | |
print(f"Initialization error: {e}") | |
raise |