|
from fastapi import FastAPI, HTTPException, Header, Depends, Request |
|
from fastapi.responses import JSONResponse |
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials |
|
from fastapi.exceptions import RequestValidationError |
|
from typing import Optional, List |
|
from pydantic import BaseModel, ValidationError |
|
import pandas as pd |
|
import datetime |
|
|
|
api = FastAPI() |
|
|
|
|
|
questions_data = pd.read_csv('questions.csv') |
|
|
|
|
|
users_credentials = { |
|
"alice": "wonderland", |
|
"bob": "builder", |
|
"clementine": "mandarine", |
|
"admin": "4dm1N" |
|
} |
|
|
|
|
|
class Question(BaseModel): |
|
question: str |
|
subject: str |
|
correct: Optional[str] = None |
|
use: str |
|
responseA: str |
|
responseB: str |
|
responseC: Optional[str] = None |
|
responseD: Optional[str] = None |
|
|
|
|
|
|
|
class MyException(Exception): |
|
def __init__(self, |
|
status_code: int, |
|
name : str, |
|
message : str): |
|
self.status_code = status_code |
|
self.name = name |
|
self.message = message |
|
self.date = str(datetime.datetime.now()) |
|
|
|
|
|
@api.exception_handler(MyException) |
|
def MyExceptionHandler( |
|
request: Request, |
|
exception: MyException |
|
): |
|
return JSONResponse( |
|
status_code=exception.status_code, |
|
content={ |
|
'url': str(request.url), |
|
'name': exception.name, |
|
'message': exception.message, |
|
'date': exception.date |
|
} |
|
) |
|
|
|
|
|
@api.exception_handler(RequestValidationError) |
|
async def validation_exception_handler(request: Request, exc: RequestValidationError): |
|
return JSONResponse( |
|
status_code=422, |
|
content={ |
|
'url': str(request.url), |
|
'name': "Erreur de validation de la requête (parametre requis)", |
|
'message': exc.errors(), |
|
'date': str(datetime.datetime.now()) |
|
}, |
|
) |
|
|
|
|
|
@api.exception_handler(ValidationError) |
|
async def validation_exception_handler(request: Request, exc: ValidationError): |
|
return JSONResponse( |
|
status_code=422, |
|
content={ |
|
'url': str(request.url), |
|
'name': "Erreur de validation Pydantic", |
|
'message': exc.errors(), |
|
'date': str(datetime.datetime.now()) |
|
}, |
|
) |
|
|
|
|
|
def authenticate(authorization: str = Header(None)): |
|
if not authorization: |
|
raise HTTPException(status_code=401, detail="Utilisateur non authorisé1") |
|
print("scheme, credentials ",authorization) |
|
try: |
|
scheme, credentials = authorization.split() |
|
print("scheme="+scheme+" credentials="+credentials) |
|
if scheme != 'Basic': |
|
raise HTTPException(status_code=401, detail="Utilisateur non authorisé "+scheme) |
|
username, password = credentials.split(":") |
|
if username not in users_credentials or users_credentials[username] != password: |
|
raise HTTPException(status_code=401, detail="Utilisateur non authorisé "+username+":"+password) |
|
except Exception as e: |
|
raise HTTPException(status_code=401, detail="Utilisateur non authorisé2") |
|
return username, password |
|
|
|
|
|
@api.get('/', name="Vérification que l'API fonctionne") |
|
def check_api(): |
|
return {'message': "L'API fonctionne"} |
|
|
|
|
|
@api.get('/questions', name="Récupération des questions pour un QCM") |
|
def get_questions(use: str, |
|
subject: str, |
|
num_questions: int, |
|
authorization: str = Header(None)): |
|
""" |
|
Récupère les questions en fonction du type de test (use) et des catégories (subject) spécifiés |
|
L'application peut produire des QCMs de 5, 10 ou 20 questions (seulement) |
|
Les questions sont retournées dans un ordre aléatoire |
|
Seuls les utilisateurs se trouvant dans users_credentials peuvent utiliser cette application |
|
""" |
|
|
|
authenticate(authorization) |
|
|
|
if num_questions not in [5,10,20]: |
|
raise MyException(status_code=422, name="num_questions invalide", \ |
|
message="La requête peut contenir 51,10 ou 20 questions, mais pas "+str(num_questions)) |
|
|
|
|
|
filtered_questions = questions_data |
|
if use: |
|
filtered_questions = filtered_questions[filtered_questions['use'] == use] |
|
if subject is not None: |
|
s = subject.split(',') |
|
filtered_questions = filtered_questions[filtered_questions['subject'].isin(s)] |
|
print("len(filtered_questions)=",len(filtered_questions)) |
|
print("num_questions=",num_questions) |
|
|
|
if len(filtered_questions) == 0: |
|
raise MyException(status_code=404, name="Aucune question", \ |
|
message="Aucune question ne correspond aux critères sélectionnés") |
|
|
|
elif (len(filtered_questions) < num_questions): |
|
raise MyException(status_code=400, name="Nb insuffisant de questions ", \ |
|
message="Le nombre de questions correspondantes aux critères sélectionnés est < au nombre requis") |
|
|
|
|
|
filtered_questions['correct'] = filtered_questions['correct'].apply(lambda x: None if pd.isna(x) else x) |
|
filtered_questions['responseC'] = filtered_questions['responseC'].apply(lambda x: None if pd.isna(x) else x) |
|
filtered_questions['responseD'] = filtered_questions['responseD'].apply(lambda x: None if pd.isna(x) else x) |
|
|
|
|
|
selected_questions = filtered_questions.sample(n=min(num_questions, len(filtered_questions))) |
|
|
|
|
|
questions_list = selected_questions.to_dict(orient='records') |
|
|
|
|
|
questions_objects = [Question(**question) for question in questions_list] |
|
return questions_objects |
|
|
|
|
|
|
|
@api.post('/questions/create', name="Création d'une nouvelle question") |
|
def create_question(question: Question, |
|
auth_info: tuple = Depends(authenticate)): |
|
""" |
|
Crée une nouvelle question et l'ajoute à questions.csv |
|
Seuls l' utilisateur admin a le droit d'utiliser cette fonction |
|
""" |
|
global questions_data |
|
|
|
username, password = auth_info |
|
if username != 'admin': |
|
raise HTTPException(status_code=401, detail="Utilisateur non authorisé") |
|
|
|
|
|
new_question = question.model_dump() |
|
questions_data = questions_data.append(new_question, ignore_index=True) |
|
|
|
|
|
questions_data.to_csv('questions.csv', index=False) |
|
|
|
return {'message': 'Question créée avec succès'} |
|
|
|
|