from fastapi import FastAPI, HTTPException, Query, Request, Depends, Path |
from fastapi.middleware.cors import CORSMiddleware |
from fastapi.middleware.gzip import GZipMiddleware |
import aiosqlite |
from typing import List, Optional |
import random |
import orjson |
from datetime import datetime, timedelta |
from contextlib import asynccontextmanager |
from aiocache import cached, SimpleMemoryCache |
from aiocache.serializers import JsonSerializer |
app = FastAPI( |
title="Quiz API", |
description="A REST API for managing and generating quizzes", |
version="1.0.0", |
docs_url="/", |
redoc_url="/redocs", |
json_encoder=orjson.dumps, |
json_decoder=orjson.loads |
) |
app.add_middleware(GZipMiddleware, minimum_size=1000) |
origins = ["*"] |
app.add_middleware( |
CORSMiddleware, |
allow_origins=origins, |
allow_credentials=True, |
allow_methods=["*"], |
allow_headers=["*"], |
) |
CACHE_TIME_SECONDS = 7*24*3600 |
@app.middleware("http") |
async def add_cache_headers(request: Request, call_next): |
response = await call_next(request) |
if response.status_code == 200 and request.url.path != "/ping" and request.url.path != "/": |
expires = datetime.utcnow() + timedelta(seconds=CACHE_TIME_SECONDS) |
response.headers["Cache-Control"] = f"public, max-age={CACHE_TIME_SECONDS}" |
response.headers["Expires"] = expires.strftime("%a, %d %b %Y %H:%M:%S GMT") |
return response |
@asynccontextmanager |
async def get_db_connection(): |
connection = await aiosqlite.connect("questions.db") |
connection.row_factory = aiosqlite.Row |
try: |
yield connection |
finally: |
await connection.close() |
async def get_db(): |
async with get_db_connection() as connection: |
yield connection |
@app.get("/ping", summary="Ping the server", tags=["Health Check"]) |
async def ping(): |
"""Return a simple status to check if the server is running.""" |
return "OKK" |
@app.get("/{subject}/{chapter}/{topic}/question-ids", |
summary="Get question IDs for a topic", |
tags=["Questions ID"], |
response_description="List of question IDs for a given topic." |
) |
@cached(ttl=600, cache=SimpleMemoryCache, serializer=JsonSerializer()) |
async def get_question_ids( |
subject: str, |
chapter: str, |
topic: str, |
offset: int = Query(0, ge=0, description="Offset for pagination"), |
limit: int = Query(1000, ge=1, le=1000, description="Limit for pagination"), |
db: aiosqlite.Connection = Depends(get_db) |
): |
""" |
Return list of question IDs for a given topic. |
""" |
query = "SELECT question_id FROM questions WHERE subject = ? AND chapter = ? AND topic = ? LIMIT ? OFFSET ?" |
params = [subject, chapter, topic, limit, offset] |
cursor = await db.execute(query, params) |
question_ids = await cursor.fetchall() |
if not question_ids: |
raise HTTPException( |
status_code=404, detail="Topic not found or no questions available for this page." |
) |
return { |
"subject": subject, |
"chapter": chapter, |
"topic": topic, |
"question_ids": [q["question_id"] for q in question_ids], |
} |
@app.get("/questions", |
summary="Filter questions", |
tags=["Questions"], |
response_description="List of questions matching given filters." |
) |
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer()) |
async def filter_questions( |
subject: Optional[str] = None, |
chapter: Optional[str] = None, |
topic: Optional[str] = None, |
question_type: Optional[str] = None, |
paper_id: Optional[str] = None, |
must_do: Optional[bool] = None, |
offset: int = Query(0, ge=0, description="Offset for pagination"), |
limit: int = Query(1000, ge=1, le=1000, description="Limit for pagination"), |
db: aiosqlite.Connection = Depends(get_db) |
): |
""" |
Filter questions based on multiple optional parameters, including the 'must_do' flag. |
""" |
query = "SELECT * FROM questions WHERE 1=1" |
params = [] |
if subject: |
query += " AND subject = ?" |
params.append(subject) |
if chapter: |
query += " AND chapter = ?" |
params.append(chapter) |
if topic: |
query += " AND topic = ?" |
params.append(topic) |
if question_type: |
query += " AND question_type = ?" |
params.append(question_type) |
if paper_id: |
query += " AND paper_id = ?" |
params.append(paper_id) |
if must_do is not None: |
query += " AND must_do = ?" |
params.append(1 if must_do else 0) |
query += " LIMIT ? OFFSET ?" |
params.extend([limit, offset]) |
cursor = await db.execute(query, params) |
questions = await cursor.fetchall() |
if not questions: |
raise HTTPException( |
status_code=404, detail="No questions found with the given filters for this page." |
) |
return [dict(question) for question in questions] |
@app.get("/questions/{question_id}", |
summary="Get a question by ID", |
tags=["Questions"], |
response_description="Details of the requested question." |
) |
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer()) |
async def get_question_by_id(question_id: str, db: aiosqlite.Connection = Depends(get_db)): |
"""Get question details based on question ID.""" |
cursor = await db.execute( |
"SELECT * FROM questions WHERE question_id = ?", (question_id,) |
) |
question = await cursor.fetchone() |
if not question: |
raise HTTPException(status_code=404, detail="Question not found.") |
return dict(question) |
@app.get("/{subject}", |
summary="List chapters for a subject", |
tags=["Subjects & Chapters"], |
response_description="List of chapters for a given subject." |
) |
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer()) |
async def list_chapters(subject: str, db: aiosqlite.Connection = Depends(get_db)): |
""" |
Return list of chapters for a given subject along with the count of questions in each chapter. |
Also includes the total number of questions in the subject. |
""" |
cursor = await db.execute( |
"SELECT chapter, COUNT(*) as question_count FROM questions WHERE subject = ? GROUP BY chapter", |
(subject,), |
) |
chapters = await cursor.fetchall() |
if not chapters: |
raise HTTPException( |
status_code=404, detail="Subject not found or no chapters available." |
) |
return { |
"subject": subject, |
"chapters": [dict(chapter) for chapter in chapters] |
} |
@app.get("/{subject}/{chapter}", |
summary="List topics for a chapter", |
tags=["Subjects & Chapters"], |
response_description="List of topics for a given chapter." |
) |
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer()) |
async def list_topics(subject: str, chapter: str, db: aiosqlite.Connection = Depends(get_db)): |
""" |
Return list of topics for a given chapter along with the count of questions in each topic. |
Also includes the total number of questions in the chapter. |
""" |
cursor = await db.execute( |
"SELECT topic, COUNT(*) as question_count FROM questions WHERE subject = ? AND chapter = ? GROUP BY topic", |
(subject, chapter), |
) |
topics = await cursor.fetchall() |
if not topics: |
raise HTTPException( |
status_code=404, detail="Chapter not found or no topics available." |
) |
return { |
"subject": subject, |
"chapter": chapter, |
"topics": [dict(topic) for topic in topics] |
} |
@app.get("/{subject}/{chapter}/{topic}", |
summary="List questions for a topic", |
tags=["Questions"], |
response_description="List of questions for a given topic." |
) |
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer()) |
async def list_questions(subject: str, chapter: str, topic: str, |
offset: int = Query(0, ge=0, description="Offset for pagination"), |
limit: int = Query(1000, ge=1, le=1000, description="Limit for pagination"), |
db: aiosqlite.Connection = Depends(get_db)): |
""" |
Return list of questions for a given topic. |
""" |
query = "SELECT * FROM questions WHERE subject = ? AND chapter = ? AND topic = ? LIMIT ? OFFSET ?" |
params = [subject, chapter, topic, limit, offset] |
cursor = await db.execute(query, params) |
questions = await cursor.fetchall() |
if not questions: |
raise HTTPException( |
status_code=404, detail="Topic not found or no questions available for this page." |
) |
return { |
"subject": subject, |
"chapter": chapter, |
"topic": topic, |
"questions": [dict(question) for question in questions], |
} |
@app.post("/generate-test", |
summary="Generate a test", |
tags=["Test Generation"], |
response_description="Generated test with the specified parameters." |
) |
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer()) |
async def generate_test( |
subjects: Optional[List[str]] = None, |
chapters: Optional[List[str]] = None, |
topics: Optional[List[str]] = None, |
num_questions: int = 10, |
total_time: int = 60, |
offset: int = Query(0, ge=0, description="Offset for pagination (results may vary)"), |
limit: int = Query(1000, ge=1, le=1000, description="Limit the number of questions to select (results may vary)"), |
db: aiosqlite.Connection = Depends(get_db) |
): |
""" |
Generate a test with the specified parameters. |
""" |
query = "SELECT * FROM questions WHERE 1=1" |
params = [] |
if subjects: |
query += f" AND subject IN ({','.join(['?'] * len(subjects))})" |
params.extend(subjects) |
if chapters: |
query += f" AND chapter IN ({','.join(['?'] * len(chapters))})" |
params.extend(chapters) |
if topics: |
query += f" AND topic IN ({','.join(['?'] * len(topics))})" |
params.extend(topics) |
cursor = await db.execute(query, params) |
all_matching_questions = await cursor.fetchall() |
if not all_matching_questions: |
raise HTTPException( |
status_code=404, detail="No questions found matching the criteria." |
) |
start_index = offset |
end_index = offset + limit |
paginated_questions = all_matching_questions[start_index:end_index] |
if not paginated_questions: |
return { |
"total_questions": 0, |
"total_time": total_time, |
"questions": [] |
} |
num_available_questions = len(paginated_questions) |
num_to_select = min(num_questions, num_available_questions) |
selected_questions = random.sample(paginated_questions, num_to_select) |
test = { |
"total_questions": num_to_select, |
"total_time": total_time, |
"questions": [dict(q) for q in selected_questions], |
} |
return test |
if __name__ == "__main__": |
import uvicorn |
uvicorn.run(app, host="", port=5000, debug=False, workers=6, timeout_keep_alive=60) |