Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import zipfile | |
import logging | |
import tempfile | |
import magic | |
from pathlib import Path | |
from typing import Set, Optional | |
from fastapi import FastAPI, File, UploadFile, HTTPException, Request | |
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.middleware.trustedhost import TrustedHostMiddleware | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Initialize FastAPI app | |
app = FastAPI(title="Static Site Server") | |
# Add security middlewares | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Configure as needed | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
app.add_middleware( | |
TrustedHostMiddleware, | |
allowed_hosts=["*"] # Configure as needed | |
) | |
# Constants | |
MAX_UPLOAD_SIZE = 100 * 1024 * 1024 # 100MB | |
ALLOWED_EXTENSIONS = {'.html', '.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot'} | |
class SiteManager: | |
def __init__(self): | |
self.sites_dir = Path("/app/sites") | |
self.temp_dir = Path("/app/temp") | |
self.active_sites: Set[str] = set() | |
# Ensure directories exist | |
self.sites_dir.mkdir(parents=True, exist_ok=True) | |
self.temp_dir.mkdir(parents=True, exist_ok=True) | |
# Load existing sites | |
self._load_existing_sites() | |
def _load_existing_sites(self): | |
"""Load existing sites from disk""" | |
logger.info("Loading existing sites...") | |
for site_dir in self.sites_dir.iterdir(): | |
if site_dir.is_dir() and (site_dir / 'index.html').exists(): | |
self.active_sites.add(site_dir.name) | |
logger.info(f"Loaded site: {site_dir.name}") | |
def _validate_file_types(self, zip_path: Path) -> bool: | |
"""Validate file types in ZIP archive""" | |
mime = magic.Magic(mime=True) | |
with zipfile.ZipFile(zip_path) as zip_ref: | |
for file_info in zip_ref.filelist: | |
if file_info.filename.endswith('/'): # Skip directories | |
continue | |
suffix = Path(file_info.filename).suffix.lower() | |
if suffix not in ALLOWED_EXTENSIONS: | |
return False | |
# Extract file to check MIME type | |
with tempfile.NamedTemporaryFile() as tmp: | |
with zip_ref.open(file_info) as source: | |
shutil.copyfileobj(source, tmp) | |
tmp.flush() | |
mime_type = mime.from_file(tmp.name) | |
if mime_type.startswith('application/x-'): | |
return False | |
return True | |
async def deploy_site(self, unique_id: str, zip_file: UploadFile) -> dict: | |
"""Deploy a new site from a ZIP file""" | |
if await zip_file.read(1) == b'': | |
raise HTTPException(status_code=400, detail="Empty file") | |
await zip_file.seek(0) | |
# Create temporary file | |
temp_file = self.temp_dir / f"{unique_id}.zip" | |
try: | |
# Save uploaded file | |
content = await zip_file.read() | |
if len(content) > MAX_UPLOAD_SIZE: | |
raise HTTPException(status_code=400, detail="File too large") | |
temp_file.write_bytes(content) | |
# Validate ZIP file | |
if not zipfile.is_zipfile(temp_file): | |
raise HTTPException(status_code=400, detail="Invalid ZIP file") | |
# Validate file types | |
if not self._validate_file_types(temp_file): | |
raise HTTPException(status_code=400, detail="Invalid file types in ZIP") | |
# Process the ZIP file | |
site_path = self.sites_dir / unique_id | |
with zipfile.ZipFile(temp_file) as zip_ref: | |
# Verify index.html exists | |
if not any(name.endswith('/index.html') or name == 'index.html' | |
for name in zip_ref.namelist()): | |
raise HTTPException( | |
status_code=400, | |
detail="ZIP file must contain index.html in root directory" | |
) | |
# Clear existing site if present | |
if site_path.exists(): | |
shutil.rmtree(site_path) | |
# Extract files | |
zip_ref.extractall(self.temp_dir / unique_id) | |
# Move to final location | |
extraction_path = self.temp_dir / unique_id | |
root_dir = next( | |
(p for p in extraction_path.iterdir() if p.is_dir() | |
and (p / 'index.html').exists()), | |
extraction_path | |
) | |
shutil.move(str(root_dir), str(site_path)) | |
self.active_sites.add(unique_id) | |
return { | |
"status": "success", | |
"message": f"Site deployed at /{unique_id}", | |
"url": f"/{unique_id}" | |
} | |
except Exception as e: | |
logger.error(f"Error deploying site {unique_id}: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
finally: | |
# Cleanup | |
if temp_file.exists(): | |
temp_file.unlink() | |
cleanup_path = self.temp_dir / unique_id | |
if cleanup_path.exists(): | |
shutil.rmtree(cleanup_path) | |
def remove_site(self, unique_id: str) -> bool: | |
"""Remove a deployed site""" | |
if unique_id in self.active_sites: | |
site_path = self.sites_dir / unique_id | |
if site_path.exists(): | |
shutil.rmtree(site_path) | |
self.active_sites.remove(unique_id) | |
return True | |
return False | |
# Initialize site manager | |
site_manager = SiteManager() | |
async def deploy_site(unique_id: str, file: UploadFile = File(...)): | |
"""Deploy a new site from a ZIP file""" | |
if not file.filename.endswith('.zip'): | |
raise HTTPException(status_code=400, detail="File must be a ZIP archive") | |
result = await site_manager.deploy_site(unique_id, file) | |
return JSONResponse(content=result) | |
async def remove_site(unique_id: str): | |
"""Remove a deployed site""" | |
if site_manager.remove_site(unique_id): | |
return {"status": "success", "message": f"Site {unique_id} removed"} | |
raise HTTPException(status_code=404, detail="Site not found") | |
async def list_sites(): | |
"""List all deployed sites""" | |
return {"sites": list(site_manager.active_sites)} | |
async def health_check(): | |
"""Health check endpoint""" | |
return {"status": "healthy", "sites_count": len(site_manager.active_sites)} | |
# Mount static file handlers for each site | |
async def startup_event(): | |
"""Configure static file handlers for existing sites""" | |
logger.info("Starting up server...") | |
for site_id in site_manager.active_sites: | |
site_path = site_manager.sites_dir / site_id | |
app.mount(f"/{site_id}", StaticFiles(directory=str(site_path), html=True), name=site_id) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |