Spaces:
Runtime error
Runtime error
import threading | |
import time | |
import gradio as gr | |
import logging | |
import json | |
import re | |
import torch | |
import tempfile | |
import subprocess | |
import ast | |
import os | |
import dataclasses | |
from pathlib import Path | |
from typing import Dict, List, Tuple, Optional, Any, Union | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import numpy as np | |
from PIL import Image | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('gradio_builder.log') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Constants | |
DEFAULT_PORT = 7860 | |
MODEL_CACHE_DIR = Path("model_cache") | |
TEMPLATE_DIR = Path("templates") | |
TEMP_DIR = Path("temp") | |
DATABASE_PATH = Path("code_database.json") #Path for our simple database | |
# Ensure directories exist | |
for directory in [MODEL_CACHE_DIR, TEMPLATE_DIR, TEMP_DIR]: | |
directory.mkdir(exist_ok=True, parents=True) | |
class Template: | |
code: str | |
description: str | |
components: List[str] | |
metadata: Dict[str, Any] = field(default_factory=dict) | |
version: str = "1.0" | |
class TemplateManager: | |
# ... (TemplateManager remains the same) ... | |
class RAGSystem: | |
def __init__(self, model_name: str = "gpt2", device: str = "cuda" if torch.cuda.is_available() else "cpu", embedding_model="all-mpnet-base-v2"): | |
try: | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
self.model = AutoModelForCausalLM.from_pretrained(model_name).to(device) | |
self.device = device | |
self.pipe = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer, device=self.device) | |
self.embedding_model = SentenceTransformer(embedding_model) | |
self.load_database() | |
except Exception as e: | |
logger.error(f"Error loading language model or embedding model: {e}. Falling back to placeholder generation.") | |
self.pipe = None | |
self.embedding_model = None | |
self.code_embeddings = None | |
def load_database(self): | |
"""Loads or creates the code database""" | |
if DATABASE_PATH.exists(): | |
try: | |
with open(DATABASE_PATH, 'r', encoding='utf-8') as f: | |
self.database = json.load(f) | |
self.code_embeddings = np.array(self.database['embeddings']) | |
logger.info("Loaded code database from file") | |
except (json.JSONDecodeError, KeyError) as e: | |
logger.error(f"Error loading code database: {e}. Creating new database.") | |
self.database = {'codes': [], 'embeddings': []} | |
self.code_embeddings = np.array([]) | |
else: | |
logger.info("Code database does not exist. Creating new database.") | |
self.database = {'codes': [], 'embeddings': []} | |
self.code_embeddings = np.array([]) | |
if self.embedding_model and len(self.database['codes']) != len(self.database['embeddings']): | |
logger.warning("Mismatch between number of codes and embeddings, rebuilding embeddings") | |
self.rebuild_embeddings() | |
elif self.embedding_model is None: | |
logger.warning("Embeddings are not supported in this context. ") | |
#Index the embeddings for efficient searching | |
if len(self.code_embeddings) > 0 and self.embedding_model: | |
self.index = faiss.IndexFlatL2(self.code_embeddings.shape[1]) #L2 distance | |
self.index.add(self.code_embeddings) | |
def add_to_database(self, code: str): | |
"""Adds a code snippet to the database""" | |
try: | |
embedding = self.embedding_model.encode(code) | |
self.database['codes'].append(code) | |
self.database['embeddings'].append(embedding.tolist()) | |
self.code_embeddings = np.vstack((self.code_embeddings, embedding)) | |
self.index.add(np.array([embedding])) # update FAISS index | |
self.save_database() | |
logger.info(f"Added code snippet to database. Total size:{len(self.database['codes'])}") | |
except Exception as e: | |
logger.error(f"Error adding to database: {e}") | |
def save_database(self): | |
"""Saves the database to a file""" | |
try: | |
with open(DATABASE_PATH, 'w', encoding='utf-8') as f: | |
json.dump(self.database, f, indent=2) | |
logger.info(f"Saved database to {DATABASE_PATH}") | |
except Exception as e: | |
logger.error(f"Error saving database: {e}") | |
def rebuild_embeddings(self): | |
"""rebuilds embeddings from the codes""" | |
try: | |
embeddings = self.embedding_model.encode(self.database['codes']) | |
self.code_embeddings = embeddings | |
self.database['embeddings'] = embeddings.tolist() | |
self.index = faiss.IndexFlatL2(embeddings.shape[1]) #L2 distance | |
self.index.add(embeddings) | |
self.save_database() | |
logger.info("Rebuilt and saved embeddings to the database") | |
except Exception as e: | |
logger.error(f"Error rebuilding embeddings: {e}") | |
def retrieve_similar_code(self, description: str, top_k: int = 3) -> List[str]: | |
"""Retrieves similar code snippets from the database""" | |
if self.embedding_model is None: | |
return [] | |
try: | |
embedding = self.embedding_model.encode(description) | |
D, I = self.index.search(np.array([embedding]), top_k) | |
return [self.database['codes'][i] for i in I[0]] | |
except Exception as e: | |
logger.error(f"Error retrieving similar code: {e}") | |
return [] | |
def generate_code(self, description: str, template_code: str) -> str: | |
retrieved_codes = self.retrieve_similar_code(description) | |
prompt = f"Description: {description}\nRetrieved Code Snippets:\n{''.join([f'```python\n{code}\n```\n' for code in retrieved_codes])}\nTemplate:\n```python\n{template_code}\n```\nGenerated Code:\n```python\n" | |
if self.pipe: | |
try: | |
generated_text = self.pipe(prompt, max_length=500, num_return_sequences=1)[0]['generated_text'] | |
generated_code = generated_text.split("Generated Code:")[1].strip().split('```')[0] | |
return generated_code | |
except Exception as e: | |
logger.error(f"Error generating code with language model: {e}. Returning template code.") | |
return template_code | |
else: | |
return f"# Placeholder code generation. Description: {description}\n{template_code}" | |
def generate_interface(self, screenshot: Optional[Image.Image], description: str) -> str: | |
retrieved_codes = self.retrieve_similar_code(description) | |
prompt = f"Create a Gradio interface based on this description: {description}\nRetrieved Code Snippets:\n{''.join([f'```python\n{code}\n```\n' for code in retrieved_codes])}" | |
if screenshot: | |
prompt += "\nThe interface should resemble the provided screenshot." | |
prompt += "\n```python\n" | |
if self.pipe: | |
try: | |
generated_text = self.pipe(prompt, max_length=500, num_return_sequences=1)[0]['generated_text'] | |
generated_code = generated_text.split("```")[1].strip() | |
return generated_code | |
except Exception as e: | |
logger.error(f"Error generating interface with language model: {e}. Returning placeholder.") | |
return "import gradio as gr\n\ndemo = gr.Interface(fn=lambda x:x, inputs='text', outputs='text')\ndemo.launch()" | |
else: | |
return "import gradio as gr\n\ndemo = gr.Interface(fn=lambda x:x, inputs='text', outputs='text')\ndemo.launch()" | |
class PreviewManager: | |
# ... (PreviewManager remains largely the same) ... | |
class GradioInterface: | |
def __init__(self): | |
self.template_manager = TemplateManager(TEMPLATE_DIR) | |
self.template_manager.load_templates() | |
self.current_code = "" | |
self.rag_system = RAGSystem() | |
self.preview_manager = PreviewManager() | |
def _extract_components(self, code: str) -> List[str]: | |
"""Extract components from the code.""" | |
# This logic should analyze the code and extract components. | |
# For example, you might look for function definitions, classes, etc. | |
components = [] | |
# Simple regex to find function definitions | |
function_matches = re.findall(r'def (\w+)', code) | |
components.extend(function_matches) | |
# Simple regex to find class definitions | |
class_matches = re.findall(r'class (\w+)', code) | |
components.extend(class_matches) | |
# You can add more sophisticated logic here as needed | |
return components | |
def _get_template_choices(self) -> List[str]: | |
"""Get available template choices.""" | |
return list(self.template_manager.templates.keys()) | |
def launch(self, **kwargs): | |
with gr.Blocks() as interface: | |
gr.Markdown("## Code Generation Interface") | |
description_input = gr.Textbox(label="Description", placeholder="Enter a description for the code you want to generate.") | |
code_output = gr.Textbox(label="Generated Code", interactive=False) | |
generate_button = gr.Button("Generate Code") | |
template_choice = gr.Dropdown(label="Select Template", choices=self._get_template_choices(), value=None) | |
save_button = gr.Button("Save as Template") | |
# Generate code button action | |
generate_button.click( | |
fn=self.generate_code, | |
inputs=description_input, | |
outputs=code_output | |
) | |
# Save template button action | |
save_button.click( | |
fn=self.save_template, | |
inputs=[code_output, template_choice, description_input], | |
outputs=code_output | |
) | |
# Additional UI elements can be added here | |
gr.Markdown("### Preview") | |
preview_output = gr.Textbox(label="Preview", interactive=False) | |
self.preview_manager.update_preview(code_output) # Update preview with generated code | |
# Update preview when code is generated | |
generate_button.click( | |
fn=lambda code: self.preview_manager.update_preview(code), | |
inputs=code_output, | |
outputs=preview_output | |
) | |
interface.launch(**kwargs) | |
def generate_code(self, description: str) -> str: | |
"""Generate code based on the description.""" | |
template_code = "" # Placeholder for template code | |
return self.rag_system.generate_code(description, template_code) | |
def save_template(self, code: str, name: str, description: str) -> str: | |
"""Save the generated code as a template.""" | |
try: | |
components = self._extract_components(code) | |
template = Template(code=code, description=description, components=components) | |
if self.template_manager.save_template(name, template): | |
self.rag_system.add_to_database(code) # Add code to the database | |
return f"✅ Template '{name}' saved successfully." | |
else: | |
return "❌ Failed to save template." | |
except Exception as e: | |
logger.error(f"Error saving template: {e}") | |
return f"❌ Error saving template: {str(e)}" | |
def main(): | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('gradio_builder.log') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
logger.info("=== Application Startup ===") | |
try: | |
# Initialize and launch interface | |
interface = GradioInterface() | |
interface.launch( | |
server_port=DEFAULT_PORT, | |
share=False, | |
debug=True | |
) | |
except Exception as e: | |
logger.error(f"Application error: {e}") | |
raise | |
finally: | |
logger.info("=== Application Shutdown ===") | |
if __name__ == "__main__": | |
main() |