import asyncio import uvicorn import os import shutil from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from jupyter_client.manager import KernelManager from typing import Dict, List from datetime import datetime, timedelta import psutil # Set environment variables for Jupyter os.environ['JUPYTER_RUNTIME_DIR'] = '/app/kernels' os.environ['IPYTHONDIR'] = '/app/ipython' os.environ['MPLCONFIGDIR'] = '/app/matplotlib' mapbox_api_key = os.environ.get("MAPBOX_API_KEY","") app = FastAPI() # Define root directory for all session folders root_dir = os.path.abspath(os.path.dirname(__file__)) # Ensure base output directory exists os.makedirs(os.path.join(root_dir, "output"), mode=0o777, exist_ok=True) # Middleware for CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Model for input data class CodeExecutionRequest(BaseModel): session_token: str code: str # Store kernel managers and last access times kernel_managers: Dict[str, KernelManager] = {} last_access_times: Dict[str, datetime] = {} # Timeout duration in seconds TIMEOUT_DURATION = 600 # 10 minutes # Function to create a new kernel and session directory async def create_kernel(session_token: str): session_dir = os.path.join(root_dir, "output", session_token) os.makedirs(session_dir, mode=0o777, exist_ok=True) # Create a kernel with specific working directory and environment setup km = KernelManager() setup_cmd = f'''import os; os.chdir("{session_dir}"); os.environ["MAPBOX_API_KEY"] = "{mapbox_api_key}"; from ipykernel.kernelapp import main; main()''' km.kernel_cmd = ['python', '-c', setup_cmd] km.start_kernel(cwd=session_dir) kernel_managers[session_token] = km last_access_times[session_token] = datetime.now() # Function to kill a kernel async def kill_kernel(session_token: str): km = kernel_managers.pop(session_token, None) if km: km.shutdown_kernel(now=True) last_access_times.pop(session_token, None) session_dir = os.path.join(root_dir, "output", session_token) if os.path.exists(session_dir): shutil.rmtree(session_dir) # Add file upload endpoint @app.post("/upload/{session_token}") async def upload_files(session_token: str, files: List[UploadFile] = File(...)): session_dir = os.path.join(root_dir, "output", session_token) os.makedirs(session_dir, mode=0o777, exist_ok=True) uploaded_files = [] for file in files: file_path = os.path.join(session_dir, file.filename) with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) uploaded_files.append(file.filename) os.chmod(file_path, 0o777) # Give full permissions to uploaded file print("uploaded_files", uploaded_files) return {"filenames": uploaded_files, "status": "success"} # Function to execute code in a kernel async def execute_code(session_token: str, code: str): session_dir = os.path.join(root_dir, "output", session_token) os.makedirs(session_dir, mode=0o777, exist_ok=True) if session_token not in kernel_managers: await create_kernel(session_token) km = kernel_managers[session_token] kc = km.client() try: # Setup code to ensure correct working directory setup_code = f""" import os os.chdir(r'{session_dir}') print('Current working directory:', os.getcwd()) """ # Execute setup code first kc.execute(setup_code) msg = kc.get_shell_msg() # Wait for setup completion # Execute the provided code kc.execute(code) output = [] timeout = datetime.now() + timedelta(seconds=TIMEOUT_DURATION) while True: if datetime.now() > timeout: raise TimeoutError("Code execution timed out.") msg = kc.get_iopub_msg() if msg['msg_type'] == 'status' and msg['content']['execution_state'] == 'idle': break elif msg['msg_type'] == 'error': error_output = { "ename": msg['content']['ename'], "evalue": msg['content']['evalue'], "traceback": msg['content']['traceback'] } output.append({"error": error_output}) if 'data' in msg['content']: output.append({"data": msg['content']['data']}) elif 'text' in msg['content']: output.append({"text": msg['content']['text']}) last_access_times[session_token] = datetime.now() print("Execution SUCCESS") print("#################") print("CODE:", code) print("OUTPUT:", output) return {'status': 'success', 'value': output} except Exception as e: last_access_times[session_token] = datetime.now() return {'status': 'error', 'value': str(e)} # Background task to check for idle kernels async def check_idle_kernels(): while True: now = datetime.now() for session_token, last_access in list(last_access_times.items()): if now - last_access > timedelta(seconds=TIMEOUT_DURATION): await kill_kernel(session_token) await asyncio.sleep(60) # Check every minute @app.on_event("startup") async def startup_event(): # Ensure all required directories exist with proper permissions os.makedirs('/app/kernels', mode=0o777, exist_ok=True) os.makedirs('/app/ipython', mode=0o777, exist_ok=True) os.makedirs('/app/matplotlib', mode=0o777, exist_ok=True) os.makedirs(os.path.join(root_dir, "output"), mode=0o777, exist_ok=True) asyncio.create_task(check_idle_kernels()) @app.post("/execute") async def execute(request: CodeExecutionRequest): result = await execute_code(request.session_token, request.code) return result @app.get("/info") async def get_info(): # Get the number of active kernels active_kernels = len(kernel_managers) # Get system resource usage cpu_usage = psutil.cpu_percent(interval=1) ram_usage = psutil.virtual_memory().percent return { "active_kernels": active_kernels, "cpu_usage_percent": cpu_usage, "ram_usage_percent": ram_usage } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)