import os import re import json import requests import torch from fastapi import FastAPI, HTTPException from pydantic import BaseModel from google.cloud import storage from google.auth import exceptions from transformers import AutoModelForCausalLM, AutoTokenizer from transformers.hf_api import HfApi, HfFolder, HfLoginManager from io import BytesIO from dotenv import load_dotenv import uvicorn load_dotenv() # Variables de entorno API_KEY = os.getenv("API_KEY") GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME") GOOGLE_APPLICATION_CREDENTIALS_JSON = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") HF_API_TOKEN = os.getenv("HF_API_TOKEN") # Token de Hugging Face # Inicialización del cliente de GCS try: credentials_info = json.loads(GOOGLE_APPLICATION_CREDENTIALS_JSON) storage_client = storage.Client.from_service_account_info(credentials_info) bucket = storage_client.bucket(GCS_BUCKET_NAME) except (exceptions.DefaultCredentialsError, json.JSONDecodeError, KeyError) as e: print(f"Error al cargar credenciales o bucket: {e}") exit(1) # Inicialización de FastAPI app = FastAPI() # Inicio de sesión en Hugging Face try: if not HF_API_TOKEN: raise ValueError("El token de Hugging Face no está definido en las variables de entorno.") HfApi().set_access_token(HF_API_TOKEN) print("Inicio de sesión en Hugging Face exitoso.") except Exception as e: print(f"Error al iniciar sesión en Hugging Face: {e}") exit(1) class DownloadModelRequest(BaseModel): model_name: str pipeline_task: str input_text: str class GCSStreamHandler: def __init__(self, bucket_name): self.bucket = storage_client.bucket(bucket_name) def file_exists(self, blob_name): return self.bucket.blob(blob_name).exists() def stream_file_from_gcs(self, blob_name): blob = self.bucket.blob(blob_name) if not blob.exists(): raise HTTPException(status_code=404, detail=f"Archivo '{blob_name}' no encontrado en GCS.") return blob.download_as_bytes() def upload_file_to_gcs(self, blob_name, data_stream): blob = self.bucket.blob(blob_name) blob.upload_from_file(data_stream) print(f"Archivo {blob_name} subido a GCS.") def ensure_bucket_structure(self, model_prefix): # Crea automáticamente la estructura en el bucket si no existe required_files = ["config.json", "tokenizer.json"] for filename in required_files: blob_name = f"{model_prefix}/{filename}" if not self.file_exists(blob_name): print(f"Creando archivo ficticio: {blob_name}") self.bucket.blob(blob_name).upload_from_string("{}", content_type="application/json") def stream_model_files(self, model_prefix, model_patterns): model_files = {} for pattern in model_patterns: blobs = list(self.bucket.list_blobs(prefix=f"{model_prefix}/")) for blob in blobs: if re.match(pattern, blob.name.split('/')[-1]): print(f"Archivo encontrado: {blob.name}") model_files[blob.name.split('/')[-1]] = BytesIO(blob.download_as_bytes()) return model_files @app.post("/predict/") async def predict(request: DownloadModelRequest): try: gcs_handler = GCSStreamHandler(GCS_BUCKET_NAME) # Asegura la estructura del bucket gcs_handler.ensure_bucket_structure(request.model_name) # Define patrones para los archivos de modelos model_patterns = [ r"pytorch_model-\d+-of-\d+", r"model-\d+", r"pytorch_model.bin", r"model.safetensors" ] # Carga los archivos del modelo desde el bucket model_files = gcs_handler.stream_model_files(request.model_name, model_patterns) # Cargar configuración y modelo config_stream = gcs_handler.stream_file_from_gcs(f"{request.model_name}/config.json") tokenizer_stream = gcs_handler.stream_file_from_gcs(f"{request.model_name}/tokenizer.json") model = AutoModelForCausalLM.from_pretrained(BytesIO(config_stream)) state_dict = {} for filename, stream in model_files.items(): state_dict.update(torch.load(stream, map_location="cpu")) model.load_state_dict(state_dict) tokenizer = AutoTokenizer.from_pretrained(BytesIO(tokenizer_stream)) # Crear pipeline pipeline_task = request.pipeline_task if pipeline_task not in ["text-generation", "sentiment-analysis", "translation", "fill-mask", "question-answering"]: raise HTTPException(status_code=400, detail="Unsupported pipeline task") pipeline_ = pipeline(pipeline_task, model=model, tokenizer=tokenizer) input_text = request.input_text result = pipeline_(input_text) return {"response": result} except Exception as e: raise HTTPException(status_code=500, detail=f"Error: {e}") @app.post("/upload/") async def upload_model_to_gcs(model_name: str): """ Descarga un modelo desde Hugging Face y lo sube a GCS en streaming. """ try: gcs_handler = GCSStreamHandler(GCS_BUCKET_NAME) # Archivos comunes de los modelos file_patterns = [ "pytorch_model.bin", "model.safetensors", "config.json", "tokenizer.json", ] # Agregar patrones para fragmentos de modelos for i in range(1, 100): file_patterns.append(f"pytorch_model-{i:05}-of-{100:05}") file_patterns.append(f"model-{i:05}") for filename in file_patterns: url = f"https://huggingface.co/{model_name}/resolve/main/{filename}" headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} try: response = requests.get(url, headers=headers, stream=True) if response.status_code == 200: blob_name = f"{model_name}/{filename}" blob = bucket.blob(blob_name) blob.upload_from_file(BytesIO(response.content)) print(f"Archivo {filename} subido correctamente a GCS.") except Exception as e: print(f"Archivo {filename} no encontrado: {e}") except Exception as e: raise HTTPException(status_code=500, detail=f"Error al subir modelo: {e}") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)