import threading import time import gradio as gr import logging import json import re import torch import tempfile import subprocess import ast from pathlib import Path from typing import Dict, List, Tuple, Optional, Any, Union from dataclasses import dataclass, field from enum import Enum from transformers import ( AutoTokenizer, AutoModelForCausalLM, pipeline, AutoProcessor, AutoModel ) from sentence_transformers import SentenceTransformer import faiss import numpy as np from PIL import Image from transformers import BlipForConditionalGeneration # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('gradio_builder.log') ] ) logger = logging.getLogger(__name__) # Constants DEFAULT_PORT = 7860 MODEL_CACHE_DIR = Path("model_cache") TEMPLATE_DIR = Path("templates") TEMP_DIR = Path("temp") # Ensure directories exist for directory in [MODEL_CACHE_DIR, TEMPLATE_DIR, TEMP_DIR]: directory.mkdir(exist_ok=True) @dataclass class Template: """Template data structure""" code: str description: str components: List[str] metadata: Dict[str, Any] = field(default_factory=dict) version: str = "1.0" class ComponentType(Enum): """Supported Gradio component types""" IMAGE = "Image" TEXTBOX = "Textbox" BUTTON = "Button" NUMBER = "Number" MARKDOWN = "Markdown" JSON = "JSON" HTML = "HTML" CODE = "Code" DROPDOWN = "Dropdown" SLIDER = "Slider" CHECKBOX = "Checkbox" RADIO = "Radio" AUDIO = "Audio" VIDEO = "Video" FILE = "File" DATAFRAME = "DataFrame" LABEL = "Label" PLOT = "Plot" @dataclass class ComponentConfig: """Configuration for Gradio components""" type: ComponentType label: str properties: Dict[str, Any] = field(default_factory=dict) events: List[str] = field(default_factory=list) class BuilderError(Exception): """Base exception for Gradio Builder errors""" pass class ValidationError(BuilderError): """Raised when validation fails""" pass class GenerationError(BuilderError): """Raised when code generation fails""" pass class ModelError(BuilderError): """Raised when model operations fail""" pass def setup_gpu_memory(): """Configure GPU memory usage""" try: if torch.cuda.is_available(): # Enable memory growth torch.cuda.empty_cache() # Set memory fraction torch.cuda.set_per_process_memory_fraction(0.8) logger.info("GPU memory configured successfully") else: logger.info("No GPU available, using CPU") except Exception as e: logger.warning(f"Error configuring GPU memory: {e}") def validate_code(code: str) -> Tuple[bool, str]: """Validate Python code syntax""" try: ast.parse(code) return True, "Code is valid" except SyntaxError as e: line_no = e.lineno offset = e.offset line = e.text if line: pointer = " " * (offset - 1) + "^" error_detail = f"\nLine {line_no}:\n{line}\n{pointer}" else: error_detail = f" at line {line_no}" return False, f"Syntax error: {str(e)}{error_detail}" except Exception as e: return False, f"Validation error: {str(e)}" class CodeFormatter: """Handles code formatting and cleanup""" @staticmethod def format_code(code: str) -> str: """Format code using black""" try: import black return black.format_str(code, mode=black.FileMode()) except ImportError: logger.warning("black not installed, returning unformatted code") return code except Exception as e: logger.error(f"Error formatting code: {e}") return code @staticmethod def cleanup_code(code: str) -> str: """Clean up generated code""" # Remove any potential unsafe imports unsafe_imports = ['os', 'subprocess', 'sys'] lines = code.split('\n') cleaned_lines = [] for line in lines: skip = False for unsafe in unsafe_imports: if f"import {unsafe}" in line or f"from {unsafe}" in line: skip = True break if not skip: cleaned_lines.append(line) return '\n'.join(cleaned_lines) def create_temp_module(code: str) -> str: """Create a temporary module from code""" try: temp_file = TEMP_DIR / f"temp_module_{int(time.time())}.py" with open(temp_file, "w", encoding="utf-8") as f: f.write(code) return str(temp_file) except Exception as e: raise BuilderError(f"Failed to create temporary module: {e}") # Initialize GPU configuration setup_gpu_memory() class ModelManager: """Manages AI models and their configurations""" def __init__(self, cache_dir: Path = MODEL_CACHE_DIR): self.cache_dir = cache_dir self.cache_dir.mkdir(exist_ok=True) self.loaded_models = {} self.model_configs = { "code_generator": { "model_id": "bigcode/starcoder", "tokenizer": AutoTokenizer, "model": AutoModelForCausalLM, "kwargs": { "torch_dtype": torch.float16, "device_map": "auto", "cache_dir": str(cache_dir) } }, "image_processor": { "model_id": "Salesforce/blip-image-captioning-base", "processor": AutoProcessor, "model": BlipForConditionalGeneration, "kwargs": { "cache_dir": str(cache_dir), "device_map": "auto" } } } def load_model(self, model_type: str): """Load a model by type""" try: if model_type not in self.model_configs: raise ModelError(f"Unknown model type: {model_type}") if model_type in self.loaded_models: return self.loaded_models[model_type] config = self.model_configs[model_type] logger.info(f"Loading {model_type} model...") if model_type == "code_generator": tokenizer = config["tokenizer"].from_pretrained( config["model_id"], **config["kwargs"] ) model = config["model"].from_pretrained( config["model_id"], **config["kwargs"] ) self.loaded_models[model_type] = (model, tokenizer) elif model_type == "image_processor": try: processor = config["processor"].from_pretrained( config["model_id"], **config["kwargs"] ) model = config["model"].from_pretrained( config["model_id"], **config["kwargs"] ) if torch.cuda.is_available(): model = model.to("cuda") self.loaded_models[model_type] = (model, processor) logger.info(f"{model_type} model loaded successfully") except Exception as e: logger.error(f"Error loading {model_type} model: {e}") raise ModelError(f"Failed to load {model_type} model: {e}") logger.info(f"{model_type} model loaded successfully") return self.loaded_models[model_type] except Exception as e: raise ModelError(f"Error loading {model_type} model: {str(e)}") def unload_model(self, model_type: str): """Unload a model to free memory""" if model_type in self.loaded_models: del self.loaded_models[model_type] torch.cuda.empty_cache() logger.info(f"{model_type} model unloaded") class MultimodalRAG: """Multimodal Retrieval-Augmented Generation system""" def __init__(self): """Initialize the multimodal RAG system""" try: self.model_manager = ModelManager() # Load text encoder self.text_encoder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') # Initialize vector store self.vector_store = self._initialize_vector_store() # Load template database self.template_embeddings = {} self._initialize_template_embeddings() except Exception as e: raise ModelError(f"Error initializing MultimodalRAG: {str(e)}") def _initialize_vector_store(self) -> faiss.IndexFlatL2: """Initialize FAISS vector store""" combined_dim = 768 + 384 # BLIP (768) + text (384) return faiss.IndexFlatL2(combined_dim) def _initialize_template_embeddings(self): """Initialize template embeddings""" try: template_path = TEMPLATE_DIR / "template_embeddings.npz" if template_path.exists(): data = np.load(template_path) self.template_embeddings = { name: embedding for name, embedding in data.items() } except Exception as e: logger.error(f"Error loading template embeddings: {e}") def encode_image(self, image: Image.Image) -> np.ndarray: """Encode image using BLIP""" try: model, processor = self.model_manager.load_model("image_processor") # Process image inputs = processor(images=image, return_tensors="pt").to(model.device) # Get image features using the proper method with torch.no_grad(): outputs = model.get_image_features(**inputs) image_features = outputs.last_hidden_state.mean(dim=1) # Average pooling return image_features.cpu().numpy() except Exception as e: logger.error(f"Error encoding image: {str(e)}") raise ModelError(f"Error encoding image: {str(e)}") def encode_text(self, text: str) -> np.ndarray: """Encode text using sentence-transformers""" try: return self.text_encoder.encode(text) except Exception as e: raise ModelError(f"Error encoding text: {str(e)}") # ... rest of the MultimodalRAG class methods ... def generate_code(self, description: str, template_code: str) -> str: """Generate code using StarCoder""" try: model, tokenizer = self.model_manager.load_model("code_generator") prompt = f""" # Task: Generate a Gradio interface based on the description # Description: {description} # Base template: {template_code} # Generate a customized version of the template that implements the description. # Only output the Python code, no explanations. ```python """ inputs = tokenizer(prompt, return_tensors="pt").to(model.device) with torch.no_grad(): outputs = model.generate( inputs.input_ids, max_length=2048, temperature=0.2, top_p=0.95, do_sample=True, pad_token_id=tokenizer.eos_token_id ) generated_code = tokenizer.decode(outputs[0], skip_special_tokens=True) # Clean and format the generated code generated_code = self._clean_generated_code(generated_code) return CodeFormatter.format_code(generated_code) except Exception as e: raise GenerationError(f"Error generating code: {str(e)}") def _clean_generated_code(self, code: str) -> str: """Clean and format generated code""" # Extract code between triple backticks if present if "```python" in code: code = code.split("```python")[1].split("```")[0] elif "```" in code: code = code.split("```")[1].split("```")[0] code = code.strip() return CodeFormatter.cleanup_code(code) def find_similar_template( self, screenshot: Optional[Image.Image], description: str ) -> Tuple[str, Template]: """Find most similar template based on image and description""" try: # Get embeddings text_embedding = self.encode_text(description) if screenshot: img_embedding = self.encode_image(screenshot) query_embedding = np.concatenate([ img_embedding.flatten(), text_embedding ]) else: # If no image, duplicate text embedding to match dimensions query_embedding = np.concatenate([ text_embedding, text_embedding ]) # Search in vector store D, I = self.vector_store.search( np.array([query_embedding]), k=1 ) # Get template name from index template_names = list(self.template_embeddings.keys()) template_name = template_names[I[0][0]] # Load template template_path = TEMPLATE_DIR / f"{template_name}.json" with open(template_path, 'r') as f: template_data = json.load(f) template = Template(**template_data) return template_name, template except Exception as e: raise ModelError(f"Error finding similar template: {str(e)}") def generate_interface( self, screenshot: Optional[Image.Image], description: str ) -> str: """Generate complete interface based on input""" try: # Find similar template template_name, template = self.find_similar_template( screenshot, description ) # Generate customized code custom_code = self.generate_code( description, template.code ) return custom_code except Exception as e: raise GenerationError(f"Error generating interface: {str(e)}") def cleanup(self): """Cleanup resources""" try: # Save template embeddings self.save_template_embeddings() # Unload models self.model_manager.unload_model("code_generator") self.model_manager.unload_model("image_processor") # Clear CUDA cache torch.cuda.empty_cache() except Exception as e: logger.error(f"Error during cleanup: {e}") class TemplateManager: """Manages Gradio templates and their metadata""" def __init__(self): self.templates = {} self.component_index = {} self.category_index = {} self.load_templates() def load_templates(self): """Load built-in templates""" # Load built-in templates self.templates = { "image_classifier": Template( code=""" import gradio as gr import numpy as np from PIL import Image def classify_image(image): if image is None: return {"error": 1.0} return {"class1": 0.8, "class2": 0.2} with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Image Classifier") with gr.Row(): with gr.Column(): input_image = gr.Image(type="pil") classify_btn = gr.Button("Classify") with gr.Column(): output_labels = gr.Label() classify_btn.click( fn=classify_image, inputs=input_image, outputs=output_labels ) if __name__ == "__main__": demo.launch() """, description="Basic image classification interface", components=["Image", "Button", "Label"], metadata={"category": "computer_vision"} ), "chatbot": Template( code=""" import gradio as gr def respond(message, history): return f"You said: {message}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# AI Chatbot") chatbot = gr.Chatbot() msg = gr.Textbox(label="Message") clear = gr.Button("Clear") msg.submit(respond, [msg, chatbot], [chatbot]) clear.click(lambda: None, None, chatbot, queue=False) if __name__ == "__main__": demo.launch() """, description="Interactive chatbot interface", components=["Chatbot", "Textbox", "Button"], metadata={"category": "nlp"} ), "audio_processor": Template( code=""" import gradio as gr import numpy as np def process_audio(audio, volume_factor=1.0): if audio is None: return None sr, data = audio return (sr, data * volume_factor) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Audio Processor") with gr.Row(): with gr.Column(): input_audio = gr.Audio(source="microphone", type="numpy") volume = gr.Slider(minimum=0, maximum=2, value=1, label="Volume") process_btn = gr.Button("Process") with gr.Column(): output_audio = gr.Audio(type="numpy") process_btn.click( fn=process_audio, inputs=[input_audio, volume], outputs=output_audio ) if __name__ == "__main__": demo.launch() """, description="Audio processing interface", components=["Audio", "Slider", "Button"], metadata={"category": "audio"} ), "file_processor": Template( code=""" import gradio as gr def process_file(file): if file is None: return "No file uploaded" return f"Processed file: {file.name}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# File Processor") with gr.Row(): with gr.Column(): file_input = gr.File(label="Upload File") process_btn = gr.Button("Process") with gr.Column(): output = gr.Textbox(label="Results") json_output = gr.JSON(label="Detailed Results") process_btn.click( fn=process_file, inputs=file_input, outputs=[output, json_output] ) if __name__ == "__main__": demo.launch() """, description="File processing interface", components=["File", "Button", "Textbox", "JSON"], metadata={"category": "utility"} ), "data_visualization": Template( code=""" import gradio as gr import pandas as pd import plotly.express as px def visualize_data(data, plot_type): if data is None: return None df = pd.read_csv(data.name) if plot_type == "scatter": fig = px.scatter(df, x=df.columns[0], y=df.columns[1]) elif plot_type == "line": fig = px.line(df, x=df.columns[0], y=df.columns[1]) else: fig = px.bar(df , x=df.columns[0], y=df.columns[1]) return fig with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Data Visualizer") with gr.Row(): with gr.Column(): file_input = gr.File(label="Upload CSV") plot_type = gr.Radio( choices=["scatter", "line", "bar"], label="Plot Type", value="scatter" ) visualize_btn = gr.Button("Visualize") with gr.Column(): plot_output = gr.Plot(label="Visualization") visualize_btn.click( fn=visualize_data, inputs=[file_input, plot_type], outputs=plot_output ) if __name__ == "__main__": demo.launch() """, description="Data visualization interface", components=["File", "Radio", "Button", "Plot"], metadata={"category": "data_science"} ), "form_builder": Template( code=""" import gradio as gr import json def submit_form(name, email, age, interests, subscribe): return json.dumps({ "name": name, "email": email, "age": age, "interests": interests, "subscribe": subscribe }, indent=2) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Form Builder") with gr.Row(): with gr.Column(): name = gr.Textbox(label="Name") email = gr.Textbox(label="Email") age = gr.Number(label="Age") interests = gr.CheckboxGroup( choices=["Sports", "Music", "Art", "Technology"], label="Interests" ) subscribe = gr.Checkbox(label="Subscribe to newsletter") submit_btn = gr.Button("Submit") with gr.Column(): output = gr.JSON(label="Form Data") submit_btn.click( fn=submit_form, inputs=[name, email, age, interests, subscribe], outputs=output ) if __name__ == "__main__": demo.launch() """, description="Form builder interface", components=["Textbox", "Number", "CheckboxGroup", "Checkbox", "Button", "JSON"], metadata={"category": "utility"} ), "text_summarizer": Template( code=""" import gradio as gr from transformers import pipeline summarizer = pipeline("summarization") def summarize_text(text): summary = summarizer(text, max_length=150, min_length=40, do_sample=False) return summary[0]['summary_text'] with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Text Summarizer") with gr.Row(): with gr.Column(): input_text = gr.Textbox(label="Input Text", lines=10, placeholder="Enter text to summarize...") summarize_btn = gr.Button("Summarize") with gr.Column(): summary_output = gr.Textbox(label="Summary", lines=5) summarize_btn.click( fn=summarize_text, inputs=input_text, outputs=summary_output ) if __name__ == "__main__": demo.launch() """, description="Text summarization interface using a transformer model", components=["Textbox", "Button"], metadata={"category": "nlp"} ), "image_captioner": Template( code=""" import gradio as gr from transformers import BlipProcessor, BlipForConditionalGeneration from PIL import Image processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base") def generate_caption(image): inputs = processor(image, return_tensors="pt") out = model.generate(**inputs) caption = processor.decode(out[0], skip_special_tokens=True) return caption with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Image Caption Generator") with gr.Row(): with gr.Column(): input_image = gr.Image(type="pil", label="Upload Image") caption_btn = gr.Button("Generate Caption") with gr.Column(): caption_output = gr.Textbox(label="Generated Caption") caption_btn.click( fn=generate_caption, inputs=input_image, outputs=caption_output ) if __name__ == "__main__": demo.launch() """, description="Image captioning interface using a transformer model", components=["Image", "Button", "Textbox"], metadata={"category": "computer_vision"} ), "style_transfer": Template( code=""" import gradio as gr import tensorflow as tf import tensorflow_hub as hub hub_model = hub.load('https://tfhub.dev/google/magenta/arbitrary-image-stylization-v1-256/2') def apply_style(content_image, style_image): content_image = tf.image.convert_image_dtype(content_image, tf.float32)[tf.newaxis, ...] style_image = tf.image.convert_image_dtype(style_image, tf.float32)[tf.newaxis, ...] stylized_image = hub_model(content_image, style_image)[0] return tf.squeeze(stylized_image).numpy() with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Neural Style Transfer") with gr.Row(): with gr.Column(): content_image = gr.Image(label="Content Image") style_image = gr.Image(label="Style Image") transfer_btn = gr.Button("Transfer Style") with gr.Column(): output_image = gr.Image(label="Stylized Image") transfer_btn.click( fn=apply_style, inputs=[content_image, style_image], outputs=output_image ) if __name__ == "__main__": demo.launch() """, description="Neural style transfer between two images", components=["Image", "Button"], metadata={"category": "computer_vision"} ), "sentiment_analysis": Template( code=""" import gradio as gr from transformers import pipeline sentiment_pipeline = pipeline("sentiment-analysis") def analyze_sentiment(text): result = sentiment_pipeline(text)[0] return f"{result['label']} ({result['score']:.2f})" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Sentiment Analysis") with gr.Row(): with gr.Column(): input_text = gr.Textbox(label="Input Text", lines=5, placeholder="Enter text to analyze sentiment...") analyze_btn = gr.Button("Analyze Sentiment") with gr.Column(): sentiment_output = gr.Textbox(label="Sentiment Result") analyze_btn.click( fn=analyze_sentiment, inputs=input_text, outputs=sentiment_output ) if __name__ == "__main__": demo.launch() """, description="Sentiment analysis using transformer model", components=["Textbox", "Button"], metadata={"category": "nlp"} ), "pdf_to_text": Template( code=""" import gradio as gr import PyPDF2 def extract_text_from_pdf(pdf): reader = PyPDF2.PdfFileReader(pdf) text = '' for page_num in range(reader.numPages): page = reader.getPage(page_num) text += page.extract_text() return text with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# PDF to Text Extractor") with gr.Row(): with gr.Column(): pdf_file = gr.File(label="Upload PDF") extract_btn = gr.Button("Extract Text") with gr.Column(): output_text = gr.Textbox(label="Extracted Text", lines=10) extract_btn.click( fn=extract_text_from_pdf, inputs=pdf_file, outputs=output_text ) if __name__ == "__main__": demo.launch() """, description="Extract text from PDF files", components=["File", "Button", "Textbox"], metadata={"category": "utility"} ), "website_monitor": Template( code=""" import gradio as gr import requests from datetime import datetime def monitor_website(url): try: response = requests.get(url) status_code = response.status_code status = "Up" if status_code == 200 else "Down" return { "url": url, "status": status, "response_time": response.elapsed.total_seconds(), "last_checked": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: return {"error": str(e)} with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Website Uptime Monitor") with gr.Row(): with gr.Column(): url_input = gr.Textbox(label="Website URL", placeholder="https://example.com") check_btn = gr.Button("Check Website") with gr.Column(): result_output = gr.JSON(label="Monitoring Result") check_btn.click( fn=monitor_website, inputs=url_input, outputs=result_output ```python ) if __name__ == "__main__": demo.launch() """, description="Monitor the uptime and response time of a website", components=["Textbox", "Button", "JSON"], metadata={"category": "web_monitoring"} ), "rss_feed_fetcher": Template( code=""" import gradio as gr import feedparser def fetch_rss_feed(url): feed = feedparser.parse(url) if feed.bozo: return {"error": "Invalid RSS feed URL"} return [{"title": entry.title, "link": entry.link} for entry in feed.entries[:5]] with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# RSS Feed Fetcher") with gr.Row(): with gr.Column(): feed_url = gr.Textbox(label="RSS Feed URL", placeholder="https://example.com/feed") fetch_btn = gr.Button("Fetch Latest Posts") with gr.Column(): feed_output = gr.JSON(label="Latest Feed Entries") fetch_btn.click( fn=fetch_rss_feed, inputs=feed_url, outputs=feed_output ) if __name__ == "__main__": demo.launch() """, description="Fetch the latest entries from an RSS feed", components=["Textbox", "Button", "JSON"], metadata={"category": "web_scraping"} ), "web_scraper": Template( code=""" import gradio as gr from bs4 import BeautifulSoup import requests def scrape_website(url, tag): try: response = requests.get(url) soup = BeautifulSoup(response.text, "html.parser") elements = soup.find_all(tag) return [element.get_text() for element in elements][:5] # Limit to 5 elements except Exception as e: return f"Error: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Web Scraper") with gr.Row(): with gr.Column(): url_input = gr.Textbox(label="Website URL", placeholder="https://example.com") tag_input = gr.Textbox(label="HTML Tag to Scrape", placeholder="h1, p, div, etc.") scrape_btn = gr.Button("Scrape Website") with gr.Column(): result_output = gr.JSON(label="Scraped Results") scrape_btn.click( fn=scrape_website, inputs=[url_input, tag_input], outputs=result_output ) if __name__ == "__main__": demo.launch() """, description="Scrape text from a website based on the specified HTML tag", components=["Textbox", "Button", "JSON"], metadata={"category": "web_scraping"} ), "api_tester": Template( code=""" import gradio as gr import requests def test_api(endpoint, method, payload): try: if method == "GET": response = requests.get(endpoint) elif method == "POST": response = requests.post(endpoint, json=payload) else: return "Unsupported method" return { "status_code": response.status_code, "response_body": response.json() if response.headers.get("Content-Type") == "application/json" else response.text } except Exception as e: return {"error": str(e)} with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# API Tester") with gr.Row(): with gr.Column(): endpoint = gr.Textbox(label="API Endpoint", placeholder="https://api.example.com/endpoint") method = gr.Radio(choices=["GET", "POST"], label="HTTP Method", value="GET") payload = gr.JSON(label="Payload (for POST)", value={}) test_btn = gr.Button("Test API") with gr.Column(): result_output = gr.JSON(label="API Response") test_btn.click( fn=test_api, inputs=[endpoint, method, payload], outputs=result_output ) if __name__ == "__main__": demo.launch() """, description="Test API endpoints with GET and POST requests", components=["Textbox", "Radio", "JSON", "Button"], metadata={"category": "api_testing"} ), "email_scheduler": Template( code=""" import gradio as gr import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() scheduler.start() def send_email(to_email, subject, body): try: sender_email = "your_email@example.com" password = "your_password" msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = to_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP('smtp.example.com', 587) server.starttls() server.login(sender_email, password) text = msg.as_string() server.sendmail(sender_email, to_email, text) server.quit() return "Email sent successfully" except Exception as e: return f"Error: {str(e)}" def schedule_email(to_email, subject, body, delay): scheduler.add_job(send_email, 'interval', seconds=delay, args=[to_email, subject, body]) return f"Email scheduled to be sent in {delay} seconds" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Email Scheduler") with gr.Row(): with gr.Column(): to_email = gr.Textbox(label="Recipient Email") subject = gr.Textbox(label="Subject") body = gr.Textbox(label="Email Body", lines=5) delay = gr.Slider(label="Delay (seconds)", minimum=10, maximum=300, step=10, value=60) schedule_btn = gr.Button("Schedule Email") with gr.Column(): result_output = gr.Textbox(label="Result") schedule_btn.click( fn=schedule_email, inputs=[to_email, subject, body, delay], outputs=result_output ) if __name__ == "__main__": demo.launch() """, description="Schedule emails to be sent after a delay", components=["Textbox", "Slider", "Button"], metadata={"category": "task_automation"} ), "log_file_analyzer": Template( code=""" import gradio as gr import re def analyze_logs(log_file, filter_text): try: logs = log_file.read().decode("utf-8") if filter_text: filtered_logs = "\n".join([line for line in logs.splitlines() if re.search(filter_text, line)]) else: filtered_logs = logs return filtered_logs except Exception as e: return f"Error: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Log File Analyzer") with gr.Row(): with gr.Column(): log_input = gr.File(label="Upload Log File") filter_input = gr.Textbox(label="Filter (Regex)", placeholder="Error|Warning") analyze_btn = gr.Button("Analyze Logs") with gr.Column(): output_logs = gr.Textbox(label="Filtered Logs", lines=20) analyze_btn.click( fn=analyze_logs, inputs=[log_input, filter_input], outputs=output_logs ) if __name__ == "__main__": demo.launch() """, description="Analyze and filter log files using regex", components=["File", "Textbox", "Button"], metadata={"category": "log_analysis"} ), "file_encryption_tool": Template( code=""" import gradio as gr from cryptography.fernet import Fernet def encrypt_file(file, password): try: key = password.ljust(32, '0').encode()[:32] # Basic password -> key mapping cipher = Fernet(Fernet.generate_key()) file_data = file.read() encrypted_data = cipher.encrypt(file_data) return encrypted_data.decode("utf-8") except Exception as e: return f"Error: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# File Encryption Tool") with gr.Row(): with gr.Column(): file_input = gr.File(label="Upload File") password_input = gr.Textbox(label="Password", type="password") encrypt_btn = gr.Button("Encrypt File") with gr.Column(): encrypted_output = gr.Textbox(label="Encrypted Data", lines=20) encrypt_btn.click( fn=encrypt_file, inputs=[file_input, password_input], outputs=encrypted_output ) if __name__ == "__main__": demo.launch() """, description="Encrypt a file using a password-based key", components=["File", "Textbox", "Button"], metadata={"category": "security"} ), "task_scheduler": Template( code=""" import gradio as gr from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime scheduler = BackgroundScheduler() scheduler.start() def schedule_task(task_name, interval): scheduler.add_job(lambda: print(f"Running task: {task_name} at {datetime.now()}"), 'interval', seconds=interval) return f"Task '{task_name}' scheduled to run every {interval} seconds." with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Task Scheduler") with gr.Row(): with gr.Column(): task_input = gr.Textbox(label="Task Name", placeholder="Example Task") interval_input = gr.Slider(minimum=1, maximum=60, label="Interval (Seconds)", value=10) schedule_btn = gr.Button("Schedule Task") with gr.Column(): result_output = gr.Textbox(label="Result") schedule_btn.click( fn=schedule_task, inputs=[task_input, interval_input], outputs=result_output ) if __name__ == "__main__": demo.launch() """, description="Schedule tasks to run at regular intervals", components=["Textbox", "Slider", "Button"], metadata={"category": "task_automation"} ), "code_comparator": Template( code=""" import gradio as gr import difflib def compare_code(code1, code2): diff = difflib.unified_diff(code1.splitlines(), code2.splitlines(), lineterm='', fromfile='code1', tofile='code2') return '\n'.join(diff) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Code Comparator") with gr.Row(): with gr.Column(): code1_input = gr.Textbox(label="Code 1", lines=15, placeholder="Paste the first code snippet here...") code2_input = gr.Textbox(label="Code 2", lines=15, placeholder="Paste the second code snippet here...") compare_btn = gr.Button("Compare Codes") with gr.Column(): diff_output = gr.Textbox(label="Difference", lines=20) compare_btn.click( fn=compare_code, inputs=[code1_input, code2_input], outputs=diff_output ) if __name__ == "__main__": demo.launch() """, description="Compare two code snippets and show the differences", components=["Textbox", "Button"], metadata={"category": "development"} ), "database_query_tool": Template( code=""" import gradio as gr import sqlite3 def query_database(db_file, query): try: conn = sqlite3.connect(db_file.name) cursor = conn.cursor() cursor.execute(query) results = cursor.fetchall() conn.close() return results except Exception as e: return f"Error: {str(e)}" with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Database Query Tool") with gr.Row(): with gr.Column(): db_input = gr.File(label="Upload SQLite DB File") query_input = gr.Textbox(label="SQL Query", placeholder="SELECT * FROM table_name;") query_btn = gr.Button("Run Query") with gr.Column(): query_output = gr.JSON(label="Query Results") query_btn.click( fn=query_database, inputs=[db_input, query_input], outputs=query_output ) if __name__ == "__main__": demo.launch() """, description="Run SQL queries on a SQLite database", components=["File", "Textbox", "Button", "JSON"], metadata={"category": "database"} ), "code_generator" : Template( code=""" pipeline("text-generation", model="Salesforce/codegen-2B-multi") def generate_code(prompt): response = code_generator(prompt, max_length=150, num_return_sequences=1) return response[0]['generated_text'] with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Code Generator") with gr.Row(): with gr.Column(): prompt_input = gr.Textbox(label="Enter Code Prompt", placeholder="Write a Python function to reverse a string") generate_btn = gr.Button("Generate Code") with gr.Column(): generated_code = gr.Textbox(label="Generated Code", lines=10) generate_btn.click( fn=generate_code, inputs=prompt_input, outputs=generated_code ) if __name__ == "__main__": demo.launch() """, description="Generate code snippets based on natural language prompts", components=["Textbox", "Button"], metadata={"category": "development"} ), "code_debugger": Template( code=""" import gradio as gr import subprocess def debug_code(code): try: with open("temp_code.py", "w") as f: f.write(code) result = subprocess.run(["python3", "temp_code.py"], capture_output=True, text=True) return {"stdout": result.stdout, "stderr": result.stderr} except Exception as e: return {"error": str(e)} with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Code Debugger") with gr.Row(): with gr.Column(): code_input = gr.Textbox(label="Code to Debug", lines=10, placeholder="Paste your Python code here...") debug_btn = gr.Button("Run Debug") with gr.Column(): debug_output = gr.JSON(label="Debug Output") debug_btn.click( fn=debug_code, inputs=code_input, outputs=debug_output ) if __name__ == "__main__": demo.launch() """, description="Run and debug Python code by capturing stdout and stderr", components=["Textbox", "Button", "JSON"], metadata={"category": "development"} ), "multi_agent_task_manager": Template( code=""" import gradio as gr from concurrent.futures import ThreadPoolExecutor import time agent_pool = ThreadPoolExecutor(max_workers=5) agent_status = {} def agent_task(agent_name, task): agent_status[agent_name] = "Running" time.sleep(5) # Simulate task duration agent_status[agent_name] = "Completed" return f"Agent {agent_name} has completed the task: {task}" def trigger_agents(agents, task): results = [] for agent in agents: future = agent_pool.submit(agent_task, agent, task) results.append(f"Agent {agent} triggered") return results def get_agent_status(): return agent_status with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Multi-Agent Task Manager") with gr.Row(): with gr.Column(): agents = gr.CheckboxGroup(choices=["Agent1", "Agent2", "Agent3", "Agent4", "Agent5"], label="Select Agents") task_input = gr.Textbox(label="Task to Perform", placeholder="Describe the task...") trigger_btn = gr.Button("Trigger Agents") with gr.Column(): task_output = gr.JSON(label="Agent Responses") with gr.Row(): status_btn = gr.Button("Get Agent Status") status_output = gr.JSON(label="Current Agent Status") trigger_btn.click( fn=trigger_agents, inputs=[agents, task_input], outputs=task_output ) status_btn.click( fn=get_agent_status, inputs=[], outputs=status_output ) if __name__ == "__main__": demo.launch() """, description="Manage and trigger tasks for multiple autonomous agents", components=["CheckboxGroup", "Textbox", "Button", "JSON"], metadata={"category": "task_automation"} ), "auto_code_refactor": Template( code=""" import gradio as gr from transformers import pipeline code_refactor = pipeline("text2text-generation", model="facebook/bart-large-cnn") def refactor_code(code): result = code_refactor(f"Refactor the following Python code: {code}", max_length=150) return result[0]['generated_text'] with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Auto Code Refactor") with gr.Row(): with gr.Column(): code_input = gr.Textbox(label="Code to Refactor", lines=10, placeholder="Paste your Python code here...") refactor_btn = gr.Button("Refactor Code") with gr.Column(): refactored_code = gr.Textbox(label="Refactored Code", lines=10) refactor_btn.click( fn=refactor_code, inputs=code_input, outputs=refactored_code ) if __name__ == "__main__": demo.launch() """, description="Automatically refactor Python code for better efficiency or readability", components=["Textbox", "Button"], metadata={"category": "development"} ), "agent_cluster_deployer": Template( code=""" import gradio as gr import random cluster_status = {} def deploy_agent_cluster(agent_count, task): cluster_id = random.randint(1000, 9999) cluster_status[cluster_id] = { "status": "Deploying", "agents": agent_count, "task": task } return f"Cluster {cluster_id} is deploying with {agent_count} agents for task: {task}" def get_cluster_status(): return cluster_status with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Agent Cluster Deployer") with gr.Row(): with gr.Column(): agent_count = gr.Slider(label="Number of Agents", minimum=1, maximum=10, step=1, value=3) task_input = gr.Textbox(label="Cluster Task", placeholder="Describe the task for the cluster...") deploy_btn = gr.Button("Deploy Cluster") with gr.Column(): deploy_output = gr.Textbox(label="Cluster Deployment Status") with gr.Row(): status_btn = gr.Button("Get Cluster Status") status_output = gr.JSON(label="Current Cluster Status") deploy_btn.click( fn=deploy_agent_cluster, inputs=[agent_count, task_input], outputs=deploy_output ) status_btn.click( fn=get_cluster_status, inputs=[], outputs=status_output ) if __name__ == "__main__": demo.launch() """, description="Deploy an autonomous agent cluster for task execution", components=["Slider", "Textbox", "Button", "JSON"], metadata={"category": "task_automation"} ) } # Build indexes after loading templates self._build_component_index() self._build_category_index() def _build_component_index(self): """Build index of templates by component""" self.component_index = {} for name, template in self.templates.items(): for component in template.components: if component not in self.component_index: self.component_index[component] = [] self.component_index[component].append(name) def _build_category_index(self): """Build index of templates by category""" self.category_index = {} for name, template in self.templates.items(): category = template.metadata.get("category", "other") if category not in self.category_index: self.category_index[category] = [] self.category_index[category].append(name) def get_template(self, name: str) -> Optional[Template]: """Get template by name""" return self.templates.get(name) def search(self, query: str, limit: int = 5) -> List[Dict]: """Search templates by description or metadata""" results = [] for name, template in self.templates.items(): score = self._calculate_search_score(query, template) if score > 0: results.append({ "name": name, "template": template, "score": score }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] def _calculate_search_score(self, query: str, template: Template) -> float: """Calculate search relevance score""" query = query.lower() score = 0.0 # Check description match if query in template.description.lower(): score += 1.0 # Check category match category = template.metadata.get("category", "").lower() if query in category: score += 0.5 # Check component match for component in template.components: if query in component.lower(): score += 0.2 return score def get_categories(self) -> List[str]: """Get list of available categories""" return list(self.category_index.keys()) def get_components(self) -> List[str]: """Get list of available components""" return list(self.component_index.keys()) # Update the Template class if needed @dataclass class Template: """Template data structure""" code: str description: str components: List[str] metadata: Dict[str, Any] = field(default_factory=dict) version: str = "1.0" # Main execution if __name__ == "__main__": # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize GPU configuration setup_gpu_memory() try: # Create and run the builder interface interface = GradioInterface() interface.launch(share=True) except Exception as e: logger.error(f"Error running application: {e}") def load_templates(self): """Load all templates from directory""" try: # Load built-in templates self.templates.update(self._get_builtin_templates()) # Load custom templates for template_file in self.template_dir.glob("*.json"): try: with open(template_file, 'r', encoding='utf-8') as f: template_data = json.load(f) name = template_file.stem self.templates[name] = Template(**template_data) except Exception as e: logger.error(f"Error loading templates: {e}") self.component_index = self._build_component_index() self.category_index = self._build_category_index() return templates def _build_component_index(self) -> Dict[str, List[str]]: """Build index of templates by component""" index = {} for name, template in self.templates.items(): for component in template.components: if component not in index: index[component] = [] index[component].append(name) return index def _build_category_index(self) -> Dict[str, List[str]]: """Build index of templates by category""" index = {} for name, template in self.templates.items(): category = template.metadata.get("category", "other") if category not in index: index[category] = [] index[category].append(name) return index def search(self, query: str, limit: int = 5) -> List[Dict]: """Search templates by description or metadata""" try: results = [] for name, template in self.templates.items(): desc_score = difflib.SequenceMatcher( None, query.lower(), template.description.lower() ).ratio() category_score = difflib.SequenceMatcher( None, query.lower(), template.metadata.get("category", "").lower() ).ratio() comp_score = sum(0.2 for component in template.components if component.lower() in query.lower()) final_score = max(desc_score, category_score) + comp_score results.append({ "name": name, "template": template, "score": final_score }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] except Exception as e: logger.error(f"Error searching templates: {str(e)}") return [] def search_by_components(self, components: List[str], limit: int = 5) -> List[Dict]: """Search templates by required components""" try: results = [] for name, template in self.templates.items(): matches = sum(1 for c in components if c in template.components) if matches > 0: score = matches / len(components) results.append({ "name": name, "template": template, "score": score }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] except Exception as e: logger.error(f"Error searching by components: {str(e)}") return [] def search_by_category(self, category: str) -> List[Dict]: """Get all templates in a category""" try: return [ { "name": name, "template": self.templates[name] } for name in self.category_index.get(category, []) ] except Exception as e: logger.error(f"Error searching by category: {str(e)}") return [] def get_template(self, name: str) -> Optional[Template]: """Get specific template by name""" return self.templates.get(name) def get_categories(self) -> List[str]: """Get list of all categories""" return list(self.category_index.keys()) def get_components(self) -> List[str]: """Get list of all components""" return list(self.component_index.keys()) def export_templates(self, path: str): """Export templates to JSON file""" try: data = { name: { "description": template.description, "components": template.components, "metadata": template.metadata, "example": template.example } for name, template in self.templates.items() } with open(path, 'w') as f: json.dump(data, f, indent=2) logger.info(f"Templates exported to {path}") except Exception as e: logger.error(f"Error exporting templates: {str(e)}") raise def import_templates(self, path: str): """Import templates from a directory""" try: for template_file in os.listdir(path): if template_file.endswith(".json"): template_path = os.path.join(path, template_file) with open(template_path, "r", encoding="utf-8") as f: template_data = json.load(f) name = template_file.replace(".json", "") self.templates[name] = Template(**template_data) self.component_index = self._build_component_index() self.category_index = self._build_category_index() except Exception as e: logging.error(f"Error importing templates: {str(e)}") raise # Usage example: if __name__ == "__main__": # Initialize template manager manager = TemplateManager() # Search examples print("\nSearching for 'machine learning':") results = manager.search("machine learning") for result in results: print(f"{result['name']}: {result['score']:.2f}") print("\nSearching for components ['Image', 'Slider']:") results = manager.search_by_components(['Image', 'Slider']) for result in results: print(f"{result['name']}: {result['score']:.2f}") print("\nCategories available:") print(manager.get_categories()) print("\nComponents available:") print(manager.get_components()) self.component_index = self._build_component_index() self.category_index = self._build_category_index() def _build_component_index(self) -> Dict[str, List[str]]: """Build index of templates by component""" index = {} for name, template in self.templates.items(): for component in template.components: if component not in index: index[component] = [] index[component].append(name) return index def _build_category_index(self) -> Dict[str, List[str]]: """Build index of templates by category""" index = {} for name, template in self.templates.items(): category = template.metadata.get("category", "other") if category not in index: index[category] = [] index[category].append(name) return index def search(self, query: str, limit: int = 5) -> List[Dict]: """Search templates by description or metadata""" try: results = [] for name, template in self.templates.items(): desc_score = difflib.SequenceMatcher( None, query.lower(), template.description.lower() ).ratio() category_score = difflib.SequenceMatcher( None, query.lower(), template.metadata.get("category", "").lower() ).ratio() comp_score = sum(0.2 for component in template.components if component.lower() in query.lower()) final_score = max(desc_score, category_score) + comp_score results.append({ "name": name, "template": template, "score": final_score }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] except Exception as e: logger.error(f"Error searching templates: {str(e)}") return [] def search_by_components(self, components: List[str], limit: int = 5) -> List[Dict]: """Search templates by required components""" try: results = [] for name, template in self.templates.items(): matches = sum(1 for c in components if c in template.components) if matches > 0: score = matches / len(components) results.append({ "name": name, "template": template, "score": score }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit] except Exception as e: logger.error(f"Error searching by components: {str(e)}") return [] def search_by_category(self, category: str) -> List[Dict]: """Get all templates in a category""" try: return [ { "name": name, "template": self.templates[name] } for name in self.category_index.get(category, []) ] except Exception as e: logger.error(f"Error searching by category: {str(e)}") return [] def get_template(self, name: str) -> Optional[Template]: """Get specific template by name""" return self.templates.get(name) def get_categories(self) -> List[str]: """Get list of all categories""" return list(self.category_index.keys()) def get_components(self) -> List[str]: """Get list of all components""" return list(self.component_index.keys()) def export_templates(self, path: str): """Export templates to JSON file""" try: data = { name: { "description": template.description, "components": template.components, "metadata": template.metadata, "example": template.example } for name, template in self.templates.items() } with open(path, 'w') as f: json.dump(data, f, indent=2) logger.info(f"Templates exported to {path}") except Exception as e: logger.error(f"Error exporting templates: {str(e)}") raise def import_templates(self, path: str): """Import templates from JSON file""" try: with open(path, 'r') as f: data = json.load(f) for name, template_data in data.items(): self.templates[name] = Template( code="", # Code should be loaded separately description=template_data["description"], components=template_data["components"], metadata=template_data["metadata"], example=template_data.get("example") ) # Rebuild indexes self.component_index = self._build_component_index() self.category_index = self._build_category_index() logger.info(f"Templates imported from {path}") except Exception as e: logger.error(f"Error importing templates: {str(e)}") raise def save_template(self, name: str, template: Template) -> bool: """Save new template""" try: template_path = self.template_dir / f"{name}.json" template_dict = { "code": template.code, "description": template.description, "components": template.components, "metadata": template.metadata, "version": template.version } with open(template_path, 'w', encoding='utf-8') as f: json.dump(template_dict, f, indent=4) self.templates[name] = template return True except Exception as e: logger.error(f"Error saving template {name}: {e}") return False def get_template(self, name: str) -> Optional[Template]: """Get template by name""" return self.templates.get(name) def list_templates(self, category: Optional[str] = None) -> List[Dict[str, Any]]: """List all available templates with optional category filter""" templates_list = [] for name, template in self.templates.items(): if category and template.metadata.get("category") != category: continue templates_list.append({ "name": name, "description": template.description, "components": template.components, "category": template.metadata.get("category", "general") }) return templates_list class InterfaceAnalyzer: """Analyzes Gradio interfaces""" @staticmethod def extract_components(code: str) -> List[ComponentConfig]: """Extract components from code""" components = [] try: tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.Call): if isinstance(node.func, ast.Attribute): if hasattr(node.func.value, 'id') and node.func.value.id == 'gr': component_type = node.func.attr if hasattr(ComponentType, component_type.upper()): # Extract component properties properties = {} label = None events = [] # Get properties from keywords for keyword in node.keywords: if keyword.arg == 'label': try: label = ast.literal_eval(keyword.value) except: label = None else: try: properties[keyword.arg] = ast.literal_eval(keyword.value) except: properties[keyword.arg] = None # Look for event handlers parent = InterfaceAnalyzer._find_parent_assign(tree, node) if parent: events = InterfaceAnalyzer._find_component_events(tree, parent) components.append(ComponentConfig( type=ComponentType[component_type.upper()], label=label or component_type, properties=properties, events=events )) except Exception as e: logger.error(f"Error extracting components: {e}") return components @staticmethod def _find_parent_assign(tree: ast.AST, node: ast.Call) -> Optional[ast.AST]: """Find the assignment node for a component""" for potential_parent in ast.walk(tree): if isinstance(potential_parent, ast.Assign): for child in ast.walk(potential_parent.value): if child == node: return potential_parent return None @staticmethod def _find_component_events(tree: ast.AST, assign_node: ast.Assign) -> List[str]: """Find events attached to a component""" events = [] component_name = assign_node.targets[0].id for node in ast.walk(tree): if isinstance(node, ast.Call): if isinstance(node.func, ast.Attribute): if hasattr(node.func.value, 'id') and node.func.value.id == component_name: events.append(node.func.attr) return events @staticmethod def analyze_interface_structure(code: str) -> Dict[str, Any]: """Analyze interface structure""" try: # Extract components components = InterfaceAnalyzer.extract_components(code) # Analyze functions functions = [] tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): functions.append({ "name": node.name, "args": [arg.arg for arg in node.args.args], "returns": InterfaceAnalyzer._get_return_type(node) }) # Analyze dependencies dependencies = set() for node in ast.walk(tree): if isinstance(node, ast.Import): for name in node.names: dependencies.add(name.name) elif isinstance(node, ast.ImportFrom): if node.module: dependencies.add(node.module) return { "components": [ { "type": comp.type.value, "label": comp.label, "properties": comp.properties, "events": comp.events } for comp in components ], "functions": functions, "dependencies": list(dependencies) } except Exception as e: logger.error(f"Error analyzing interface: {e}") return {} @staticmethod def _get_return_type(node: ast.FunctionDef) -> str: """Get function return type if specified""" if node.returns: return ast.unparse(node.returns) return "Any" class PreviewManager: """Manages interface previews""" def __init__(self): self.current_process: Optional[subprocess.Popen] = None self.preview_port = DEFAULT_PORT self._lock = threading.Lock() def start_preview(self, code: str) -> Tuple[bool, str]: """Start preview in a separate process""" with self._lock: try: self.stop_preview() # Create temporary module module_path = create_temp_module(code) # Start new process self.current_process = subprocess.Popen( ['python', module_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Wait for server to start time.sleep(2) # Check if process is still running if self.current_process.poll() is not None: stdout, stderr = self.current_process.communicate() error_msg = stderr.decode('utf-8') raise RuntimeError(f"Preview failed to start: {error_msg}") return True, f"http://localhost:{self.preview_port}" except Exception as e: return False, str(e) def stop_preview(self): """Stop current preview process""" if self.current_process: self.current_process.terminate() try: self.current_process.wait(timeout=5) except subprocess.TimeoutExpired: self.current_process.kill() self.current_process = None def cleanup(self): """Cleanup resources""" self.stop_preview() # Clean up temporary files for temp_file in TEMP_DIR.glob("*.py"): try: temp_file.unlink() except Exception as e: logger.error(f"Error deleting temporary file {temp_file}: {e}") class GradioInterface: """Main Gradio interface builder class""" def __init__(self): """Initialize the Gradio interface builder""" try: self.rag_system = MultimodalRAG() self.template_manager = TemplateManager() self.preview_manager = PreviewManager() self.current_code = "" self.error_log = [] self.interface = self._create_interface() except Exception as e: logger.error(f"Error initializing GradioInterface: {str(e)}") raise def _create_interface(self) -> gr.Blocks: """Create the main Gradio interface""" with gr.Blocks(theme=gr.themes.Soft()) as interface: gr.Markdown("# 🚀 Gradio Interface Builder") with gr.Tabs(): # Design Tab with gr.Tab("Design"): with gr.Row(): with gr.Column(scale=2): # Input Section gr.Markdown("## 📝 Design Your Interface") description = gr.Textbox( label="Description", placeholder="Describe the interface you want to create...", lines=3 ) screenshot = gr.Image( label="Screenshot (optional)", type="pil" ) with gr.Row(): generate_btn = gr.Button("🎨 Generate Interface", variant="primary") clear_btn = gr.Button("🗑ī¸ Clear") # Template Selection gr.Markdown("### 📚 Templates") template_dropdown = gr.Dropdown( choices=self._get_template_choices(), label="Base Template", interactive=True ) with gr.Column(scale=3): # Code Editor code_editor = gr.Code( label="Generated Code", language="python", interactive=True ) with gr.Row(): validate_btn = gr.Button("✅ Validate") format_btn = gr.Button("📋 Format") save_template_btn = gr.Button("💾 Save as Template") validation_output = gr.Markdown() # Preview Tab with gr.Tab("Preview"): with gr.Row(): preview_btn = gr.Button("â–ļī¸ Start Preview", variant="primary") stop_preview_btn = gr.Button("⏚ī¸ Stop Preview") preview_frame = gr.HTML( label="Preview", value="

Click 'Start Preview' to see your interface

" ) preview_status = gr.Markdown() # Analysis Tab with gr.Tab("Analysis"): analyze_btn = gr.Button("🔍 Analyze Interface") with gr.Row(): with gr.Column(): gr.Markdown("### 🧩 Components") components_json = gr.JSON(label="Detected Components") with gr.Column(): gr.Markdown("### 🔄 Functions") functions_json = gr.JSON(label="Interface Functions") with gr.Row(): with gr.Column(): gr.Markdown("### đŸ“Ļ Dependencies") dependencies_json = gr.JSON(label="Required Dependencies") with gr.Column(): gr.Markdown("### 📄 Requirements") requirements_text = gr.Textbox( label="requirements.txt", lines=10 ) # Event handlers generate_btn.click( fn=self._generate_interface, inputs=[description, screenshot, template_dropdown], outputs=[code_editor, validation_output] ) clear_btn.click( fn=self._clear_interface, outputs=[description, screenshot, code_editor, validation_output] ) validate_btn.click( fn=self._validate_code, inputs=[code_editor], outputs=[validation_output] ) format_btn.click( fn=self._format_code, inputs=[code_editor], outputs=[code_editor] ) save_template_btn.click( fn=self._save_as_template, inputs=[code_editor, description], outputs=[template_dropdown, validation_output] ) preview_btn.click( fn=self._start_preview, inputs=[code_editor], outputs=[preview_frame, preview_status] ) stop_preview_btn.click( fn=self._stop_preview, outputs=[preview_frame, preview_status] ) analyze_btn.click( fn=self._analyze_interface, inputs=[code_editor], outputs=[ components_json, functions_json, dependencies_json, requirements_text ] ) # Update template dropdown when templates change template_dropdown.change( fn=self._load_template, inputs=[template_dropdown], outputs=[code_editor] ) return interface def _get_template_choices(self) -> List[str]: """Get list of available templates""" templates = self.template_manager.list_templates() return [""] + [t["name"] for t in templates] def _generate_interface( self, description: str, screenshot: Optional[Image.Image], template_name: str ) -> Tuple[str, str]: """Generate interface code""" try: if template_name: template = self.template_manager.get_template(template_name) if template: code = self.rag_system.generate_code(description, template.code) else: raise ValueError(f"Template {template_name} not found") else: code = self.rag_system.generate_interface(screenshot, description) self.current_code = code return code, "✅ Code generated successfully" except Exception as e: error_msg = f"❌ Error generating interface: {str(e)}" logger.error(error_msg) return "", error_msg def _clear_interface(self) -> Tuple[str, None, str, str]: """Clear all inputs and outputs""" self.current_code = "" return "", None, "", "" def _validate_code(self, code: str) -> str: """Validate code syntax""" is_valid, message = validate_code(code) return f"{'✅' if is_valid else '❌'} {message}" def _format_code(self, code: str) -> str: """Format code""" try: return CodeFormatter.format_code(code) except Exception as e: logger.error(f"Error formatting code: {e}") return code def _save_as_template(self, code: str, description: str) -> Tuple[List[str], str]: """Save current code as template""" try: # Generate template name base_name = "custom_template" counter = 1 name = base_name while self.template_manager.get_template(name): name = f"{base_name}_{counter}" counter += 1 # Create template template = Template( code=code, description=description, components=InterfaceAnalyzer.extract_components(code), metadata={"category": "custom"} ) # Save template if self.template_manager.save_template(name, template): return self._get_template_choices(), f"✅ Template saved as {name}" else: raise Exception("Failed to save template") except Exception as e: error_msg = f"❌ Error saving template: {str(e)}" logger.error(error_msg) return self._get_template_choices(), error_msg def _start_preview(self, code: str) -> Tuple[str, str]: """Start interface preview""" success, result = self.preview_manager.start_preview(code) if success: return f'', "✅ Preview started" else: return "", f"❌ Preview failed: {result}" def _stop_preview(self) -> Tuple[str, str]: """Stop interface preview""" self.preview_manager.stop_preview() return "

Preview stopped

", "✅ Preview stopped" def _load_template(self, template_name: str) -> str: """Load selected template""" if not template_name: return "" template = self.template_manager.get_template(template_name) if template: return template.code return "" def _analyze_interface(self, code: str) -> Tuple[Dict, Dict, Dict, str]: """Analyze interface structure""" try: analysis = InterfaceAnalyzer.analyze_interface_structure(code) # Generate requirements.txt dependencies = analysis.get("dependencies", []) requirements = CodeGenerator.generate_requirements(dependencies) return ( analysis.get("components", {}), analysis.get("functions", {}), {"dependencies": dependencies}, requirements ) except Exception as e: logger.error(f"Error analyzing interface: {e}") return {}, {}, {}, "" def launch(self, **kwargs): """Launch the interface""" try: self.interface.launch(**kwargs) finally: self.cleanup() def cleanup(self): """Cleanup resources""" try: self.preview_manager.cleanup() self.rag_system.cleanup() except Exception as e: logger.error(f"Error during cleanup: {e}") # app.py - Main Entry Point def main(): # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('gradio_builder.log') ] ) logger = logging.getLogger(__name__) logger.info("=== Application Startup ===") try: # Initialize and launch interface interface = GradioInterface() interface.launch( server_port=DEFAULT_PORT, share=False, debug=True ) except Exception as e: logger.error(f"Application error: {e}") raise finally: logger.info("=== Application Shutdown ===") # Simple main entry point without docstring if __name__ == "__main__": main()