Spaces:
Runtime error
Runtime error
Silicon Valley - Admin
Refactor server.py to replace ProxyFix with ProxyHeadersMiddleware for improved proxy handling
ed02a4f
# server.py | |
import asyncio | |
import importlib.metadata | |
import json | |
import logging | |
import secrets | |
import uuid | |
from dataclasses import dataclass, asdict | |
from typing import Any, AsyncGenerator, Dict, Tuple, Union | |
from quart import Quart, websocket, request, send_from_directory | |
from quart_schema import QuartSchema, validate_request, validate_response | |
from starlette.middleware.proxy_headers import ProxyHeadersMiddleware # Importar ProxyHeadersMiddleware de Starlette | |
# Configuraciones | |
TIMEOUT: int = 40 | |
LOG_LEVEL: int = logging.DEBUG | |
TRUSTED_HOSTS: list[str] = ["127.0.0.1", "172.18.0.3"] | |
# Inicializaci贸n de la aplicaci贸n Quart | |
app = Quart(__name__) | |
QuartSchema(app) | |
app.asgi_app = ProxyHeadersMiddleware( | |
app.asgi_app, | |
x_for=1, | |
x_proto=1, | |
x_host=1, | |
x_port=1, | |
x_prefix=1 | |
) | |
app.logger.setLevel(LOG_LEVEL) | |
# Excepciones personalizadas | |
class SessionDoesNotExist(Exception): | |
"""Error al solicitar un ID de sesi贸n que no existe.""" | |
pass | |
class SessionAlreadyExists(Exception): | |
"""Error al crear una sesi贸n con un ID que ya existe.""" | |
pass | |
class ClientError(Exception): | |
"""Error cuando el cliente devuelve un error.""" | |
def __init__(self, message: str): | |
super().__init__(message) | |
self.message = message | |
# Clases de datos para solicitudes y respuestas | |
class ClientRequest: | |
request_id: str | |
data: Any | |
class ClientResponse: | |
request_id: str | |
error: bool | |
data: Any | |
class Status: | |
status: str | |
version: str | |
class Session: | |
session_id: str | |
class Command: | |
session_id: str | |
command: str | |
class Read: | |
session_id: str | |
path: str | |
class Write: | |
session_id: str | |
path: str | |
content: str | |
class CommandResponse: | |
return_code: int | |
stdout: str | |
stderr: str | |
class ReadResponse: | |
content: str | |
class WriteResponse: | |
size: int | |
class ErrorResponse: | |
error: str | |
# Broker para manejar sesiones y comunicaciones | |
class SessionBroker: | |
def __init__(self): | |
"""Diccionario de session_id -> cola de mensajes pendientes por enviar al cliente""" | |
self.sessions: Dict[str, asyncio.Queue] = {} | |
"""Diccionario de (session_id, request_id) -> futuro de la respuesta del cliente""" | |
self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {} | |
async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any: | |
if session_id not in self.sessions: | |
raise SessionDoesNotExist() | |
request_id = str(uuid.uuid4()) | |
loop = asyncio.get_event_loop() | |
future = loop.create_future() | |
self.pending_responses[(session_id, request_id)] = future | |
await self.sessions[session_id].put(ClientRequest(request_id=request_id, data=data)) | |
try: | |
return await asyncio.wait_for(future, timeout) | |
except asyncio.TimeoutError: | |
raise | |
finally: | |
self.pending_responses.pop((session_id, request_id), None) | |
async def receive_response(self, session_id: str, response: ClientResponse) -> None: | |
key = (session_id, response.request_id) | |
future = self.pending_responses.pop(key, None) | |
if future and not future.done(): | |
if response.error: | |
future.set_exception(ClientError(message=response.data)) | |
else: | |
future.set_result(response.data) | |
async def subscribe(self, session_id: str) -> AsyncGenerator[ClientRequest, None]: | |
if session_id in self.sessions: | |
raise SessionAlreadyExists() | |
queue = asyncio.Queue() | |
self.sessions[session_id] = queue | |
try: | |
while True: | |
yield await queue.get() | |
finally: | |
del self.sessions[session_id] | |
# Eliminar todas las respuestas pendientes de esta sesi贸n | |
keys_to_remove = [key for key in self.pending_responses if key[0] == session_id] | |
for key in keys_to_remove: | |
future = self.pending_responses.pop(key) | |
if not future.done(): | |
future.set_exception(SessionDoesNotExist()) | |
# Instanciaci贸n del broker | |
broker = SessionBroker() | |
# Funciones y rutas de la API | |
async def _receive(session_id: str) -> None: | |
while True: | |
try: | |
message = await websocket.receive() | |
response_data = json.loads(message) | |
client_response = ClientResponse(**response_data) | |
app.logger.info(f"{websocket.remote_addr} - RESPONSE - {session_id} - {json.dumps(asdict(client_response))}") | |
await broker.receive_response(session_id, client_response) | |
except Exception as e: | |
app.logger.error(f"Error al recibir respuesta: {e}") | |
break | |
async def status() -> Status: | |
try: | |
version = importlib.metadata.version('serverwitch-api') | |
except importlib.metadata.PackageNotFoundError: | |
version = "unknown" | |
return Status(status="OK", version=version) | |
async def session_handler(): | |
session_id = secrets.token_hex() | |
app.logger.info(f"{websocket.remote_addr} - NEW SESSION - {session_id}") | |
session_message = Session(session_id=session_id) | |
await websocket.send(json.dumps(asdict(session_message))) | |
task = asyncio.create_task(_receive(session_id)) | |
try: | |
async for request_data in broker.subscribe(session_id): | |
app.logger.info(f"{websocket.remote_addr} - REQUEST - {session_id} - {json.dumps(asdict(request_data))}") | |
await websocket.send(json.dumps(asdict(request_data))) | |
except SessionAlreadyExists: | |
error_response = ErrorResponse(error="Session already exists.") | |
await websocket.send(json.dumps(asdict(error_response))) | |
finally: | |
task.cancel() | |
try: | |
await task | |
except asyncio.CancelledError: | |
pass | |
async def command(data: Command) -> Tuple[Union[CommandResponse, ErrorResponse], int]: | |
try: | |
response_data = await broker.send_request( | |
data.session_id, | |
{'action': 'command', 'command': data.command}, | |
timeout=TIMEOUT | |
) | |
response = CommandResponse(**response_data) | |
return response, 200 | |
except SessionDoesNotExist: | |
app.logger.warning(f"{request.remote_addr} - INVALID SESSION ID - {repr(data.session_id)}") | |
return ErrorResponse(error='Session does not exist.'), 500 | |
except ClientError as e: | |
return ErrorResponse(error=e.message), 500 | |
except asyncio.TimeoutError: | |
return ErrorResponse(error='Timeout when waiting for client.'), 500 | |
async def read(data: Read) -> Tuple[Union[ReadResponse, ErrorResponse], int]: | |
try: | |
response_data = await broker.send_request( | |
data.session_id, | |
{'action': 'read', 'path': data.path}, | |
timeout=TIMEOUT | |
) | |
response = ReadResponse(**response_data) | |
return response, 200 | |
except SessionDoesNotExist: | |
app.logger.warning(f"{request.remote_addr} - INVALID SESSION ID - {repr(data.session_id)}") | |
return ErrorResponse(error='Session does not exist.'), 500 | |
except ClientError as e: | |
return ErrorResponse(error=e.message), 500 | |
except asyncio.TimeoutError: | |
return ErrorResponse(error='Timeout when waiting for client.'), 500 | |
async def write(data: Write) -> Tuple[Union[WriteResponse, ErrorResponse], int]: | |
try: | |
response_data = await broker.send_request( | |
data.session_id, | |
{'action': 'write', 'path': data.path, 'content': data.content}, | |
timeout=TIMEOUT | |
) | |
response = WriteResponse(**response_data) | |
return response, 200 | |
except SessionDoesNotExist: | |
app.logger.warning(f"{request.remote_addr} - INVALID SESSION ID - {repr(data.session_id)}") | |
return ErrorResponse(error='Session does not exist.'), 500 | |
except ClientError as e: | |
return ErrorResponse(error=e.message), 500 | |
except asyncio.TimeoutError: | |
return ErrorResponse(error='Timeout when waiting for client.'), 500 | |
# Rutas para servir archivos est谩ticos y OpenAPI | |
async def send_static(path): | |
return await send_from_directory('static', path) | |
async def openapi_spec(): | |
return await send_from_directory('.', 'openapi.yaml') | |
# No se necesita ejecutar nada aqu铆, ya que Hypercorn se encargar谩 de iniciar la aplicaci贸n |