web-server / main.py
pvanand's picture
Update main.py
ff80fbe verified
raw
history blame
7.61 kB
import os
import shutil
import zipfile
import logging
import tempfile
import magic
from pathlib import Path
from typing import Set, Optional
from fastapi import FastAPI, File, UploadFile, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="Static Site Server")
# Add security middlewares
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure as needed
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] # Configure as needed
)
# Constants
MAX_UPLOAD_SIZE = 100 * 1024 * 1024 # 100MB
ALLOWED_EXTENSIONS = {'.html', '.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot'}
class SiteManager:
def __init__(self):
self.sites_dir = Path("/app/sites")
self.temp_dir = Path("/app/temp")
self.active_sites: Set[str] = set()
# Ensure directories exist
self.sites_dir.mkdir(parents=True, exist_ok=True)
self.temp_dir.mkdir(parents=True, exist_ok=True)
# Load existing sites
self._load_existing_sites()
def _load_existing_sites(self):
"""Load existing sites from disk"""
logger.info("Loading existing sites...")
for site_dir in self.sites_dir.iterdir():
if site_dir.is_dir() and (site_dir / 'index.html').exists():
self.active_sites.add(site_dir.name)
logger.info(f"Loaded site: {site_dir.name}")
def _validate_file_types(self, zip_path: Path) -> bool:
"""Validate file types in ZIP archive"""
mime = magic.Magic(mime=True)
with zipfile.ZipFile(zip_path) as zip_ref:
for file_info in zip_ref.filelist:
if file_info.filename.endswith('/'): # Skip directories
continue
suffix = Path(file_info.filename).suffix.lower()
if suffix not in ALLOWED_EXTENSIONS:
return False
# Extract file to check MIME type
with tempfile.NamedTemporaryFile() as tmp:
with zip_ref.open(file_info) as source:
shutil.copyfileobj(source, tmp)
tmp.flush()
mime_type = mime.from_file(tmp.name)
if mime_type.startswith('application/x-'):
return False
return True
async def deploy_site(self, unique_id: str, zip_file: UploadFile) -> dict:
"""Deploy a new site from a ZIP file"""
if await zip_file.read(1) == b'':
raise HTTPException(status_code=400, detail="Empty file")
await zip_file.seek(0)
# Create temporary file
temp_file = self.temp_dir / f"{unique_id}.zip"
try:
# Save uploaded file
content = await zip_file.read()
if len(content) > MAX_UPLOAD_SIZE:
raise HTTPException(status_code=400, detail="File too large")
temp_file.write_bytes(content)
# Validate ZIP file
if not zipfile.is_zipfile(temp_file):
raise HTTPException(status_code=400, detail="Invalid ZIP file")
# Validate file types
if not self._validate_file_types(temp_file):
raise HTTPException(status_code=400, detail="Invalid file types in ZIP")
# Process the ZIP file
site_path = self.sites_dir / unique_id
with zipfile.ZipFile(temp_file) as zip_ref:
# Verify index.html exists
if not any(name.endswith('/index.html') or name == 'index.html'
for name in zip_ref.namelist()):
raise HTTPException(
status_code=400,
detail="ZIP file must contain index.html in root directory"
)
# Clear existing site if present
if site_path.exists():
shutil.rmtree(site_path)
# Extract files
zip_ref.extractall(self.temp_dir / unique_id)
# Move to final location
extraction_path = self.temp_dir / unique_id
root_dir = next(
(p for p in extraction_path.iterdir() if p.is_dir()
and (p / 'index.html').exists()),
extraction_path
)
shutil.move(str(root_dir), str(site_path))
self.active_sites.add(unique_id)
return {
"status": "success",
"message": f"Site deployed at /{unique_id}",
"url": f"/{unique_id}"
}
except Exception as e:
logger.error(f"Error deploying site {unique_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Cleanup
if temp_file.exists():
temp_file.unlink()
cleanup_path = self.temp_dir / unique_id
if cleanup_path.exists():
shutil.rmtree(cleanup_path)
def remove_site(self, unique_id: str) -> bool:
"""Remove a deployed site"""
if unique_id in self.active_sites:
site_path = self.sites_dir / unique_id
if site_path.exists():
shutil.rmtree(site_path)
self.active_sites.remove(unique_id)
return True
return False
# Initialize site manager
site_manager = SiteManager()
@app.post("/deploy/{unique_id}")
async def deploy_site(unique_id: str, file: UploadFile = File(...)):
"""Deploy a new site from a ZIP file"""
if not file.filename.endswith('.zip'):
raise HTTPException(status_code=400, detail="File must be a ZIP archive")
result = await site_manager.deploy_site(unique_id, file)
return JSONResponse(content=result)
@app.delete("/site/{unique_id}")
async def remove_site(unique_id: str):
"""Remove a deployed site"""
if site_manager.remove_site(unique_id):
return {"status": "success", "message": f"Site {unique_id} removed"}
raise HTTPException(status_code=404, detail="Site not found")
@app.get("/sites")
async def list_sites():
"""List all deployed sites"""
return {"sites": list(site_manager.active_sites)}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "sites_count": len(site_manager.active_sites)}
# Mount static file handlers for each site
@app.on_event("startup")
async def startup_event():
"""Configure static file handlers for existing sites"""
logger.info("Starting up server...")
for site_id in site_manager.active_sites:
site_path = site_manager.sites_dir / site_id
app.mount(f"/{site_id}", StaticFiles(directory=str(site_path), html=True), name=site_id)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)