import os import re import json import requests from fastapi import FastAPI, HTTPException from pydantic import BaseModel from google.auth import exceptions from google.cloud import storage from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline from io import BytesIO from dotenv import load_dotenv import uvicorn load_dotenv() API_KEY = os.getenv("API_KEY") GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME") GOOGLE_APPLICATION_CREDENTIALS_JSON = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") HF_API_TOKEN = os.getenv("HF_API_TOKEN") def sanitize_bucket_name(bucket_name): """Corrige un nombre de bucket inválido para ajustarse a las reglas de Google Cloud Storage.""" bucket_name = bucket_name.lower() bucket_name = re.sub(r"[^a-z0-9-\.]", "-", bucket_name) bucket_name = bucket_name.strip("-.") if len(bucket_name) > 63: bucket_name = bucket_name[:63] if not re.match(r"^[a-z0-9]", bucket_name): bucket_name = "a" + bucket_name if not re.match(r"[a-z0-9]$", bucket_name): bucket_name = bucket_name + "a" return bucket_name def validate_bucket_name(bucket_name): """Valida si el nombre de bucket cumple con las reglas de Google Cloud Storage.""" if not re.match(r"^[a-z0-9][a-z0-9\-\.]*[a-z0-9]$", bucket_name): raise ValueError(f"Nombre de bucket inválido: '{bucket_name}'. Debe cumplir con las reglas de GCS.") return bucket_name try: # Sanitizar y validar el nombre del bucket GCS_BUCKET_NAME = sanitize_bucket_name(GCS_BUCKET_NAME) GCS_BUCKET_NAME = validate_bucket_name(GCS_BUCKET_NAME) # Cargar credenciales de Google Cloud Storage credentials_info = json.loads(GOOGLE_APPLICATION_CREDENTIALS_JSON) storage_client = storage.Client.from_service_account_info(credentials_info) bucket = storage_client.bucket(GCS_BUCKET_NAME) except (exceptions.DefaultCredentialsError, json.JSONDecodeError, KeyError, ValueError) as e: print(f"Error al cargar credenciales o bucket: {e}") exit(1) app = FastAPI() class DownloadModelRequest(BaseModel): model_name: str pipeline_task: str input_text: str class GCSStreamHandler: def __init__(self, bucket_name): self.bucket = storage_client.bucket(bucket_name) def file_exists(self, blob_name): return self.bucket.blob(blob_name).exists() def stream_file_from_gcs(self, blob_name): blob = self.bucket.blob(blob_name) if not blob.exists(): raise HTTPException(status_code=404, detail=f"Archivo '{blob_name}' no encontrado en GCS.") return blob.download_as_bytes() def upload_file_to_gcs(self, blob_name, data_stream): blob = self.bucket.blob(blob_name) blob.upload_from_file(data_stream) def ensure_bucket_structure(self, model_prefix): required_files = ["config.json", "tokenizer.json"] for filename in required_files: blob_name = f"{model_prefix}/{filename}" if not self.file_exists(blob_name): self.bucket.blob(blob_name).upload_from_string("{}", content_type="application/json") def stream_model_files(self, model_prefix, model_patterns): model_files = {} for pattern in model_patterns: blobs = list(self.bucket.list_blobs(prefix=f"{model_prefix}/")) for blob in blobs: if re.match(pattern, blob.name.split('/')[-1]): model_files[blob.name.split('/')[-1]] = BytesIO(blob.download_as_bytes()) return model_files def download_model_from_huggingface(model_name): file_patterns = [ "pytorch_model.bin", "model.safetensors", "config.json", "tokenizer.json", ] for i in range(1, 100): file_patterns.append(f"pytorch_model-{i:05}-of-{100:05}") file_patterns.append(f"model-{i:05}") for filename in file_patterns: url = f"https://huggingface.co/{model_name}/resolve/main/{filename}" headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} try: response = requests.get(url, headers=headers, stream=True) if response.status_code == 200: blob_name = f"{model_name}/{filename}" blob = bucket.blob(blob_name) blob.upload_from_file(BytesIO(response.content)) except Exception: pass @app.post("/predict/") async def predict(request: DownloadModelRequest): try: gcs_handler = GCSStreamHandler(GCS_BUCKET_NAME) model_prefix = request.model_name model_patterns = [ r"pytorch_model-\d+-of-\d+", r"model-\d+", r"pytorch_model.bin", r"model.safetensors", ] if not any( gcs_handler.file_exists(f"{model_prefix}/{pattern}") for pattern in model_patterns ): download_model_from_huggingface(model_prefix) model_files = gcs_handler.stream_model_files(model_prefix, model_patterns) config_stream = gcs_handler.stream_file_from_gcs(f"{model_prefix}/config.json") tokenizer_stream = gcs_handler.stream_file_from_gcs(f"{model_prefix}/tokenizer.json") model = AutoModelForCausalLM.from_pretrained(BytesIO(config_stream)) tokenizer = AutoTokenizer.from_pretrained(BytesIO(tokenizer_stream)) pipeline_task = request.pipeline_task if pipeline_task not in ["text-generation", "sentiment-analysis", "translation", "fill-mask", "question-answering"]: raise HTTPException(status_code=400, detail="Tarea no soportada") pipeline_ = pipeline(pipeline_task, model=model, tokenizer=tokenizer) input_text = request.input_text result = pipeline_(input_text) return {"response": result} except Exception as e: raise HTTPException(status_code=500, detail=f"Error: {e}") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)