from fastapi import FastAPI, HTTPException, Request, Query from fastapi.middleware.cors import CORSMiddleware # from fastapi_cache import FastAPICache # from fastapi_cache.backends.inmemory import InMemoryBackend # from fastapi_cache.decorator import cache from queue import Empty import os import shutil app = FastAPI() root_dir = os.getcwd() # @app.on_event("startup") # async def startup(): # FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache") from pydantic import BaseModel from typing import List, Dict, Any TOGETHER_API_KEY = os.getenv('TOGETHER_API_KEY') BRAVE_API_KEY = os.getenv('BRAVE_API_KEY') GROQ_API_KEY = os.getenv("GROQ_API_KEY") HELICON_API_KEY = os.getenv("HELICON_API_KEY") SUPABASE_USER = os.environ['SUPABASE_USER'] SUPABASE_PASSWORD = os.environ['SUPABASE_PASSWORD'] app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],) import asyncio import uvicorn from fastapi import FastAPI, HTTPException from pydantic import BaseModel from jupyter_client import KernelManager from typing import Dict from datetime import datetime, timedelta import psutil # Model for input data class CodeExecutionRequest(BaseModel): session_token: str code: str # Store kernel managers and last access times kernel_managers: Dict[str, KernelManager] = {} last_access_times: Dict[str, datetime] = {} # Timeout duration in seconds TIMEOUT_DURATION = 600 # 10 minutes # Function to create a new kernel async def create_kernel(session_token: str): km = KernelManager() km.start_kernel() kernel_managers[session_token] = km last_access_times[session_token] = datetime.now() # Function to kill a kernel async def kill_kernel(session_token: str): km = kernel_managers.pop(session_token, None) if km: km.shutdown_kernel(now=True) last_access_times.pop(session_token, None) session_dir = os.path.join(root_dir, "output", session_token) if os.path.exists(session_dir): shutil.rmtree(session_dir) # Function to execute code in a kernel # Function to execute code in a kernel async def execute_code(session_token: str, code: str): setup_code = "%matplotlib inline" session_dir = os.path.join(root_dir,"output", session_token) if not os.path.exists(session_dir): os.makedirs(session_dir) if session_token not in kernel_managers: await create_kernel(session_token) km = kernel_managers[session_token] kc = km.client() try: os.chdir(session_dir) print("current working directory",os.getcwd()) # Execute setup code kc.execute_interactive(setup_code, store_history=False) # Execute the provided code kc.execute(code, store_history=False) output = [] timeout = datetime.now() + timedelta(seconds=TIMEOUT_DURATION) output = [] while True: if datetime.now() > timeout: raise TimeoutError("Code execution timed out.") msg = kc.get_iopub_msg() if msg['msg_type'] == 'status' and msg['content']['execution_state'] == 'idle': break elif msg['msg_type'] == 'error': error_output = { "ename": msg['content']['ename'], "evalue": msg['content']['evalue'], "traceback": msg['content']['traceback'] } output.append({"error": error_output}) if 'data' in msg['content']: output.append({"data": msg['content']['data']}) elif 'text' in msg['content']: output.append({"text": msg['content']['text']}) last_access_times[session_token] = datetime.now() print("Execution SUCCESS") print("#################") print("CODE:",code) print("OUTPUT:",output) return {'status': 'success', 'value': output} except Exception as e: last_access_times[session_token] = datetime.now() return {'status': 'error', 'value': str(e)} # Background task to check for idle kernels async def check_idle_kernels(): while True: now = datetime.now() for session_token, last_access in list(last_access_times.items()): if now - last_access > timedelta(seconds=TIMEOUT_DURATION): await kill_kernel(session_token) await asyncio.sleep(60) # Check every minute @app.on_event("startup") async def startup_event(): asyncio.create_task(check_idle_kernels()) @app.post("/execute") async def execute(request: CodeExecutionRequest): result = await execute_code(request.session_token, request.code) return result @app.get("/info") async def get_info(): # Get the number of active kernels active_kernels = len(kernel_managers) # Get system resource usage cpu_usage = psutil.cpu_percent(interval=1) ram_usage = psutil.virtual_memory().percent # Return the information return { "active_kernels": active_kernels, "cpu_usage_percent": cpu_usage, "ram_usage_percent": ram_usage } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)