File size: 7,613 Bytes
f3c7e36
 
 
 
 
 
 
 
5afe693
f3c7e36
5afe693
f3c7e36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7b9432
f3c7e36
 
 
c9aba22
f3c7e36
 
c9aba22
f3c7e36
e7b9432
f3c7e36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5afe693
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
from fastapi import FastAPI, HTTPException, Header, Depends, Request
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.exceptions import RequestValidationError
from typing import Optional, List
from pydantic import BaseModel, ValidationError
import pandas as pd
import datetime

api = FastAPI()

# Charger les données à partir du fichier CSV
questions_data = pd.read_csv('questions.csv')

# Dictionnaire des identifiants des utilisateurs
users_credentials = {
    "alice": "wonderland",
    "bob": "builder",
    "clementine": "mandarine",
    "admin": "4dm1N"  # Ajout de l'utilisateur admin
}

# Modèle Pydantic pour représenter une question
class Question(BaseModel):
    question: str
    subject: str
    correct: Optional[str] = None # Champ optionnel
    use: str
    responseA: str
    responseB: str
    responseC: Optional[str] = None # Champ optionnel
    responseD: Optional[str] = None # Champ optionnel
    # remark: Optional[str]  = None # Champ optionnel

# Modèle pour représenter une exception personnalisée
class MyException(Exception):
    def __init__(self,
                 status_code: int,
                 name : str,
                 message : str):
        self.status_code = status_code
        self.name = name
        self.message = message
        self.date = str(datetime.datetime.now())

# Gestionnaire d'exception personnalisé
@api.exception_handler(MyException)
def MyExceptionHandler(
    request: Request,
    exception: MyException
    ):
    return JSONResponse(
        status_code=exception.status_code,
        content={
            'url': str(request.url),
            'name': exception.name,
            'message': exception.message,
            'date': exception.date
        }
    )

# Gestionnaire d'exception pour les erreurs de validation de la requête
@api.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    return JSONResponse(
        status_code=422,
        content={
            'url': str(request.url),
            'name': "Erreur de validation de la requête (parametre requis)",
            'message': exc.errors(),
            'date': str(datetime.datetime.now())
        },
    )

# Gestionnaire d'exception pour les erreurs Pydantic
@api.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
    return JSONResponse(
        status_code=422,
        content={
            'url': str(request.url),
            'name': "Erreur de validation Pydantic",
            'message': exc.errors(),
            'date': str(datetime.datetime.now())
        },
    )


# Fonction pour vérifier l'authentification de l'utilisateur
def authenticate(authorization: str = Header(None)):
    if not authorization:
        raise HTTPException(status_code=401, detail="Utilisateur non authorisé1")
    try:
        scheme, credentials = authorization.split()
        if scheme != 'Basic':
            raise HTTPException(status_code=401, detail="Utilisateur non authorisé "+scheme)
        username, password = credentials.split(":")
        if username not in users_credentials or users_credentials[username] != password:
            raise HTTPException(status_code=401, detail="Utilisateur non authorisé "+username+":"+password)
    except Exception as e:
        raise HTTPException(status_code=401, detail="Utilisateur non authorisé2")
    return username, password

# Endpoint pour vérifier que l'API est fonctionnelle
@api.get('/', name="Vérification que l'API fonctionne")
def check_api():
    return {'message': "L'API fonctionne"}

# Endpoint pour récupérer les questions en fonction du type de test (use) et des catégories (subject) spécifiés
@api.get('/questions', name="Récupération des questions pour un QCM")
def get_questions(use: str, 
                  subject: str, 
                  num_questions: int, 
                  auth_info: tuple = Depends(authenticate)): 
    """
    Récupère les questions en fonction du type de test (use) et des catégories (subject) spécifiés
    L'application peut produire des QCMs de 5, 10 ou 20 questions (seulement)
    Les questions sont retournées dans un ordre aléatoire
    Seuls les utilisateurs se trouvant dans users_credentials peuvent utiliser cette application
    """

    # Verifier si le nombre de questions demandé correspond au nombre de questions d'un QCM
    if num_questions not in [5,10,20]:
        raise MyException(status_code=422, name="num_questions invalide", \
            message="La requête peut contenir 51,10 ou 20 questions, mais pas "+str(num_questions))
        
    # Filtrer les questions en fonction des paramètres spécifiés
    filtered_questions = questions_data
    if use:
        filtered_questions = filtered_questions[filtered_questions['use'] == use]
    if subject is not None:
        s = subject.split(',')
        filtered_questions = filtered_questions[filtered_questions['subject'].isin(s)]
    print("len(filtered_questions)=",len(filtered_questions))   
    print("num_questions=",num_questions) 
    # Vérifier si des questions sont disponibles dans la catégorie spécifiée
    if len(filtered_questions) == 0:
        raise MyException(status_code=404, name="Aucune question", \
            message="Aucune question ne correspond aux critères sélectionnés")
    # Verifier si le nombre de questions diponibles >= au nombre requis
    elif (len(filtered_questions) < num_questions):
        raise MyException(status_code=400, name="Nb insuffisant de questions ", \
            message="Le nombre de questions correspondantes aux critères sélectionnés est < au nombre requis")

    # Supprimer les valeurs NaN dans les champs correct,responseC, responseD (ils ne sont pas toujours remplis)
    filtered_questions['correct'] = filtered_questions['correct'].apply(lambda x: None if pd.isna(x) else x)
    filtered_questions['responseC'] = filtered_questions['responseC'].apply(lambda x: None if pd.isna(x) else x)
    filtered_questions['responseD'] = filtered_questions['responseD'].apply(lambda x: None if pd.isna(x) else x)

    # Sélectionner un nombre aléatoire de questions
    selected_questions = filtered_questions.sample(n=min(num_questions, len(filtered_questions)))

    # Convertir les données en liste de dictionnaires
    questions_list = selected_questions.to_dict(orient='records')

    # Convertir les dictionnaires en objets Pydantic de type Question
    questions_objects = [Question(**question) for question in questions_list]
    return questions_objects


# Endpoint pour créer une nouvelle question (accessible uniquement par l'utilisateur admin)
@api.post('/questions/create', name="Création d'une nouvelle question")
def create_question(question: Question, 
                    auth_info: tuple = Depends(authenticate)):
    """
    Crée une nouvelle question et l'ajoute à questions.csv
    Seuls l' utilisateur admin a le droit d'utiliser cette fonction
    """
    global questions_data
    
    username, password = auth_info
    if username != 'admin':
        raise HTTPException(status_code=401, detail="Utilisateur non authorisé")

    # Ajouter la nouvelle question au DataFrame
    new_question = question.model_dump()
    questions_data = questions_data.append(new_question, ignore_index=True)

    # Sauvegarder les modifications dans le fichier CSV
    questions_data.to_csv('questions.csv', index=False)
    
    return {'message': 'Question créée avec succès'}