alexfremont's picture
new api
eeed331
import os
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from transformers import pipeline
from torchvision import transforms
from PIL import Image
import requests
from io import BytesIO
from steps.preprocess import process_image
from huggingface_hub import hf_hub_download
from architecture.resnet import ResNet
import torch
import logging
from typing import List
import httpx
import asyncio
app = FastAPI()
image_size = 256
hf_token = os.environ.get("api_read")
VALID_API_KEYS = os.environ.get("api_key")
@app.middleware("http")
async def verify_api_key(request, call_next):
api_key = request.headers.get("x-api-key")
if api_key is None or api_key not in VALID_API_KEYS:
raise HTTPException(status_code=403, detail="Unauthorized")
response = await call_next(request)
return response
models_locations = [
# {
# "repo_id": "TamisAI/category-lamp",
# "subfolder": "maison-jansen/palmtree-152-0005-32-256",
# "filename": "palmtree-jansen.pth",
# },
{
"repo_id": "TamisAI/category-lamp",
"subfolder": "maison-charles/corail-152-0001-32-256-L1",
"filename": "maison-charles-corail-L1.pth",
},
{
"repo_id": "TamisAI/category-lamp",
"subfolder": "michel-armand/flamme-152-0001A-32-256-L1",
"filename": "flamme-L1.pth",
},
]
device = torch.device("cpu")
# Modèle de données pour les requêtes
class PredictRequest(BaseModel):
imageUrl: str
modelName: str
torch.set_num_threads(8)
# Dictionnaire pour stocker les pipelines de modèles
model_pipelines = {}
# Create a single instance of the ResNet model
base_model = ResNet("resnet152", num_output_neurons=2).to(device)
@app.on_event("startup")
async def load_models():
# Charger les modèles au démarrage
print(f"Loading models...{len(models_locations)}")
for model_location in models_locations:
try:
print(f"Loading model: {model_location['filename']}")
model_weight = hf_hub_download(
repo_id=model_location["repo_id"],
subfolder=model_location["subfolder"],
filename=model_location["filename"],
token=hf_token,
)
model = base_model.__class__("resnet152", num_output_neurons=2).to(device)
model.load_state_dict(
torch.load(model_weight, weights_only=True, map_location=device)
)
model.eval()
model_pipelines[model_location["filename"]] = model
except Exception as e:
print(f"Error loading model {model_location['filename']}: {e}")
print(f"Models loaded. {len(model_pipelines)}")
@app.post("/predict")
async def predict(request: PredictRequest):
image_url = request.imageUrl
model_name = request.modelName
# Télécharger l'image depuis l'URL
try:
response = requests.get(image_url)
image = Image.open(BytesIO(response.content))
except Exception as e:
raise HTTPException(status_code=400, detail="Invalid image URL")
# Vérifier si le modèle est chargé
if model_name not in model_pipelines:
raise HTTPException(status_code=404, detail="Model not found")
# Preprocess the image
processed_image = process_image(image, size=image_size)
# Convert to tensor
image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0)
model = model_pipelines[model_name]
# Perform inference
with torch.no_grad():
outputs = model(image_tensor)
probabilities = torch.nn.functional.softmax(outputs, dim=1)
predicted_probabilities = probabilities.numpy().tolist()
confidence = round(predicted_probabilities[0][1], 2)
logging.info("confidence: %s", confidence)
# Return the probabilities as JSON
return JSONResponse(content={"confidence": confidence})
class BatchPredictRequest(BaseModel):
imageUrls: List[str]
modelName: str
# @app.post("/batch_predict")
# async def batch_predict(request: BatchPredictRequest):
# model_name = request.modelName
# results = []
# # Verify if the model is loaded
# if model_name not in model_pipelines:
# raise HTTPException(status_code=404, detail="Model not found")
# model = model_pipelines[model_name]
# # Asynchronously process each image
# async with httpx.AsyncClient() as client:
# for image_url in request.imageUrls:
# try:
# response = await client.get(image_url)
# image = Image.open(BytesIO(response.content))
# except Exception as e:
# results.append({"imageUrl": image_url, "error": "Invalid image URL"})
# continue
# # Preprocess the image
# processed_image = process_image(image, size=image_size)
# # Convert to tensor
# image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0)
# # Perform inference
# with torch.no_grad():
# outputs = model(image_tensor)
# probabilities = torch.nn.functional.softmax(outputs, dim=1)
# predicted_probabilities = probabilities.numpy().tolist()
# confidence = round(predicted_probabilities[0][1], 2)
# results.append({"imageUrl": image_url, "confidence": confidence})
# # Return the results as JSON
# return JSONResponse(content={"results": results})
@app.post("/batch_predict")
async def batch_predict(request: BatchPredictRequest):
model_name = request.modelName
# Verify if the model is loaded
if model_name not in model_pipelines:
raise HTTPException(status_code=404, detail="Model not found")
model = model_pipelines[model_name]
semaphore = asyncio.Semaphore(
8
) # Limiter à 8 tâches simultanées pour éviter de surcharger la machine
async def process_single_image(image_url):
async with semaphore:
try:
async with httpx.AsyncClient() as client:
response = await client.get(image_url)
image = Image.open(BytesIO(response.content))
except Exception:
return {"imageUrl": image_url, "error": "Invalid image URL"}
# Preprocess the image
processed_image = process_image(image, size=image_size)
# Convert to tensor
image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0)
# Perform inference
with torch.no_grad():
outputs = model(image_tensor)
probabilities = torch.nn.functional.softmax(outputs, dim=1)
predicted_probabilities = probabilities.numpy().tolist()
confidence = round(predicted_probabilities[0][1], 2)
return {"imageUrl": image_url, "confidence": confidence}
# Launch tasks in parallel
tasks = [process_single_image(url) for url in request.imageUrls]
results = await asyncio.gather(*tasks)
# Return the results as JSON
return JSONResponse(content={"results": results})