Spaces:
Sleeping
Sleeping
import asyncio | |
import uvicorn | |
import os | |
import shutil | |
from fastapi import FastAPI, HTTPException, UploadFile, File | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from jupyter_client.manager import KernelManager | |
from typing import Dict, List | |
from datetime import datetime, timedelta | |
import psutil | |
# Set environment variables for Jupyter | |
os.environ['JUPYTER_RUNTIME_DIR'] = '/app/kernels' | |
os.environ['IPYTHONDIR'] = '/app/ipython' | |
os.environ['MPLCONFIGDIR'] = '/app/matplotlib' | |
mapbox_api_key = os.environ.get("MAPBOX_API_KEY","") | |
app = FastAPI() | |
# Define root directory for all session folders | |
root_dir = os.path.abspath(os.path.dirname(__file__)) | |
# Ensure base output directory exists | |
os.makedirs(os.path.join(root_dir, "output"), mode=0o777, exist_ok=True) | |
# Middleware for CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Model for input data | |
class CodeExecutionRequest(BaseModel): | |
session_token: str | |
code: str | |
# Store kernel managers and last access times | |
kernel_managers: Dict[str, KernelManager] = {} | |
last_access_times: Dict[str, datetime] = {} | |
# Timeout duration in seconds | |
TIMEOUT_DURATION = 600 # 10 minutes | |
# Function to create a new kernel and session directory | |
async def create_kernel(session_token: str): | |
session_dir = os.path.join(root_dir, "output", session_token) | |
os.makedirs(session_dir, mode=0o777, exist_ok=True) | |
# Create a kernel with specific working directory and environment setup | |
km = KernelManager() | |
setup_cmd = f'''import os; | |
os.chdir("{session_dir}"); | |
os.environ["MAPBOX_API_KEY"] = "{mapbox_api_key}"; | |
from ipykernel.kernelapp import main; | |
main()''' | |
km.kernel_cmd = ['python', '-c', setup_cmd] | |
km.start_kernel(cwd=session_dir) | |
kernel_managers[session_token] = km | |
last_access_times[session_token] = datetime.now() | |
# Function to kill a kernel | |
async def kill_kernel(session_token: str): | |
km = kernel_managers.pop(session_token, None) | |
if km: | |
km.shutdown_kernel(now=True) | |
last_access_times.pop(session_token, None) | |
session_dir = os.path.join(root_dir, "output", session_token) | |
if os.path.exists(session_dir): | |
shutil.rmtree(session_dir) | |
# Add file upload endpoint | |
async def upload_files(session_token: str, files: List[UploadFile] = File(...)): | |
session_dir = os.path.join(root_dir, "output", session_token) | |
os.makedirs(session_dir, mode=0o777, exist_ok=True) | |
uploaded_files = [] | |
for file in files: | |
file_path = os.path.join(session_dir, file.filename) | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
uploaded_files.append(file.filename) | |
os.chmod(file_path, 0o777) # Give full permissions to uploaded file | |
print("uploaded_files", uploaded_files) | |
return {"filenames": uploaded_files, "status": "success"} | |
# Function to execute code in a kernel | |
async def execute_code(session_token: str, code: str): | |
session_dir = os.path.join(root_dir, "output", session_token) | |
os.makedirs(session_dir, mode=0o777, exist_ok=True) | |
if session_token not in kernel_managers: | |
await create_kernel(session_token) | |
km = kernel_managers[session_token] | |
kc = km.client() | |
try: | |
# Setup code to ensure correct working directory | |
setup_code = f""" | |
import os | |
os.chdir(r'{session_dir}') | |
print('Current working directory:', os.getcwd()) | |
""" | |
# Execute setup code first | |
kc.execute(setup_code) | |
msg = kc.get_shell_msg() # Wait for setup completion | |
# Execute the provided code | |
kc.execute(code) | |
output = [] | |
timeout = datetime.now() + timedelta(seconds=TIMEOUT_DURATION) | |
while True: | |
if datetime.now() > timeout: | |
raise TimeoutError("Code execution timed out.") | |
msg = kc.get_iopub_msg() | |
if msg['msg_type'] == 'status' and msg['content']['execution_state'] == 'idle': | |
break | |
elif msg['msg_type'] == 'error': | |
error_output = { | |
"ename": msg['content']['ename'], | |
"evalue": msg['content']['evalue'], | |
"traceback": msg['content']['traceback'] | |
} | |
output.append({"error": error_output}) | |
if 'data' in msg['content']: | |
output.append({"data": msg['content']['data']}) | |
elif 'text' in msg['content']: | |
output.append({"text": msg['content']['text']}) | |
last_access_times[session_token] = datetime.now() | |
print("Execution SUCCESS") | |
print("#################") | |
print("CODE:", code) | |
print("OUTPUT:", output) | |
return {'status': 'success', 'value': output} | |
except Exception as e: | |
last_access_times[session_token] = datetime.now() | |
return {'status': 'error', 'value': str(e)} | |
# Background task to check for idle kernels | |
async def check_idle_kernels(): | |
while True: | |
now = datetime.now() | |
for session_token, last_access in list(last_access_times.items()): | |
if now - last_access > timedelta(seconds=TIMEOUT_DURATION): | |
await kill_kernel(session_token) | |
await asyncio.sleep(60) # Check every minute | |
async def startup_event(): | |
# Ensure all required directories exist with proper permissions | |
os.makedirs('/app/kernels', mode=0o777, exist_ok=True) | |
os.makedirs('/app/ipython', mode=0o777, exist_ok=True) | |
os.makedirs('/app/matplotlib', mode=0o777, exist_ok=True) | |
os.makedirs(os.path.join(root_dir, "output"), mode=0o777, exist_ok=True) | |
asyncio.create_task(check_idle_kernels()) | |
async def execute(request: CodeExecutionRequest): | |
result = await execute_code(request.session_token, request.code) | |
return result | |
async def get_info(): | |
# Get the number of active kernels | |
active_kernels = len(kernel_managers) | |
# Get system resource usage | |
cpu_usage = psutil.cpu_percent(interval=1) | |
ram_usage = psutil.virtual_memory().percent | |
return { | |
"active_kernels": active_kernels, | |
"cpu_usage_percent": cpu_usage, | |
"ram_usage_percent": ram_usage | |
} | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) |