kaiobash-server / server.py
Silicon Valley - Admin
Refactor Dockerfile and server implementation for improved containerization and functionality
3dde046
raw
history blame
3.22 kB
# server.py
import asyncio
import uuid
from typing import AsyncGenerator, Dict, Tuple, Any, Optional
from dataclasses import dataclass
from quart import Quart, websocket, request, Response
from quart_schema import QuartSchema, validate_request, validate_response
from quart_cors import cors
import importlib.metadata
import secrets
import logging
import hypercorn.asyncio
from broker import SessionBroker, SessionDoesNotExist, ClientRequest, ClientResponse, ClientError
# Configuraci贸n
TIMEOUT: int = 60
LOG_LEVEL: int = logging.INFO
TRUSTED_HOSTS: list[str] = ["127.0.0.1"]
# Crear aplicaci贸n con CORS habilitado
app = Quart(__name__)
app = cors(app, allow_origin="*")
QuartSchema(app)
app.logger.setLevel(LOG_LEVEL)
broker = SessionBroker()
# Definici贸n de modelos de datos
@dataclass
class Status:
status: str
version: str
@dataclass
class Session:
session_id: str
@dataclass
class Command:
session_id: str
command: str
@dataclass
class CommandResponse:
return_code: int
stdout: str
stderr: str
@dataclass
class ErrorResponse:
error: str
# Rutas API
@app.get("/status")
@validate_response(Status)
async def status() -> Status:
return Status(status="OK", version=importlib.metadata.version('your-package-name'))
@app.websocket('/session')
async def session_handler():
session_id = secrets.token_hex()
app.logger.info(f"New session: {session_id}")
await websocket.send_as(Session(session_id=session_id), Session)
task = asyncio.ensure_future(_receive(session_id))
try:
async for request in broker.subscribe(session_id):
app.logger.info(f"Sending request {request.request_id} to client.")
await websocket.send_as(request, ClientRequest)
finally:
task.cancel()
async def _receive(session_id: str) -> None:
while True:
response = await websocket.receive_as(ClientResponse)
app.logger.info(f"Received response for session {session_id}: {response}")
await broker.receive_response(session_id, response)
@app.post('/command')
@validate_request(Command)
@validate_response(CommandResponse, 200)
@validate_response(ErrorResponse, 500)
async def command(data: Command) -> Tuple[CommandResponse | ErrorResponse, int]:
try:
response_data = await broker.send_request(
session_id=data.session_id,
data={'action': 'command', 'command': data.command},
timeout=TIMEOUT
)
response = CommandResponse(**response_data)
return response, 200
except SessionDoesNotExist:
app.logger.warning(f"Invalid session ID: {data.session_id}")
return ErrorResponse(error='Session does not exist.'), 500
except ClientError as e:
return ErrorResponse(error=e.message), 500
except asyncio.TimeoutError:
return ErrorResponse(error='Timeout waiting for client response.'), 500
# Ejecutar aplicaci贸n
def run():
config = hypercorn.Config()
config.bind = ["0.0.0.0:7860"]
asyncio.run(hypercorn.asyncio.serve(app, config))
# Agregar un endpoint de health check
@app.route("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
run()