api / app.py
datavorous's picture
Update app.py
22a97c1 verified
raw
history blame
11.1 kB
from fastapi import FastAPI, HTTPException, Query, Request, Depends, Path
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
import aiosqlite
from typing import List, Optional
import random
import orjson
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
from aiocache import cached, SimpleMemoryCache
from aiocache.serializers import JsonSerializer
app = FastAPI(
title="Quiz API",
description="A REST API for managing and generating quizzes",
version="1.0.0",
docs_url="/",
redoc_url="/redocs",
json_encoder=orjson.dumps,
json_decoder=orjson.loads
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Configure CORS to allow only specific domains
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Constants for Caching
CACHE_TIME_SECONDS = 7*24*3600 # 7 days in seconds
# Cache Middleware to add caching headers
@app.middleware("http")
async def add_cache_headers(request: Request, call_next):
response = await call_next(request)
if response.status_code == 200 and request.url.path != "/ping" and request.url.path != "/":
expires = datetime.utcnow() + timedelta(seconds=CACHE_TIME_SECONDS)
response.headers["Cache-Control"] = f"public, max-age={CACHE_TIME_SECONDS}"
response.headers["Expires"] = expires.strftime("%a, %d %b %Y %H:%M:%S GMT")
return response
# Database connection utility
@asynccontextmanager
async def get_db_connection():
connection = await aiosqlite.connect("questions.db")
connection.row_factory = aiosqlite.Row
try:
yield connection
finally:
await connection.close()
# Dependency to get a database connection
async def get_db():
async with get_db_connection() as connection:
yield connection
@app.get("/ping", summary="Ping the server", tags=["Health Check"])
async def ping():
"""Return a simple status to check if the server is running."""
return "OKK"
@app.get("/{subject}/{chapter}/{topic}/question-ids",
summary="Get question IDs for a topic",
tags=["Questions ID"],
response_description="List of question IDs for a given topic."
)
@cached(ttl=600, cache=SimpleMemoryCache, serializer=JsonSerializer())
async def get_question_ids(
subject: str,
chapter: str,
topic: str,
offset: int = Query(0, ge=0, description="Offset for pagination"),
limit: int = Query(1000, ge=1, le=1000, description="Limit for pagination"),
db: aiosqlite.Connection = Depends(get_db)
):
"""
Return list of question IDs for a given topic.
"""
query = "SELECT question_id FROM questions WHERE subject = ? AND chapter = ? AND topic = ? LIMIT ? OFFSET ?"
params = [subject, chapter, topic, limit, offset]
cursor = await db.execute(query, params)
question_ids = await cursor.fetchall()
if not question_ids:
raise HTTPException(
status_code=404, detail="Topic not found or no questions available for this page."
)
return {
"subject": subject,
"chapter": chapter,
"topic": topic,
"question_ids": [q["question_id"] for q in question_ids],
}
@app.get("/questions",
summary="Filter questions",
tags=["Questions"],
response_description="List of questions matching given filters."
)
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer())
async def filter_questions(
subject: Optional[str] = None,
chapter: Optional[str] = None,
topic: Optional[str] = None,
question_type: Optional[str] = None,
paper_id: Optional[str] = None,
must_do: Optional[bool] = None,
offset: int = Query(0, ge=0, description="Offset for pagination"),
limit: int = Query(1000, ge=1, le=1000, description="Limit for pagination"),
db: aiosqlite.Connection = Depends(get_db)
):
"""
Filter questions based on multiple optional parameters, including the 'must_do' flag.
"""
query = "SELECT * FROM questions WHERE 1=1"
params = []
if subject:
query += " AND subject = ?"
params.append(subject)
if chapter:
query += " AND chapter = ?"
params.append(chapter)
if topic:
query += " AND topic = ?"
params.append(topic)
if question_type:
query += " AND question_type = ?"
params.append(question_type)
if paper_id:
query += " AND paper_id = ?"
params.append(paper_id)
if must_do is not None:
query += " AND must_do = ?"
params.append(1 if must_do else 0)
query += " LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor = await db.execute(query, params)
questions = await cursor.fetchall()
if not questions:
raise HTTPException(
status_code=404, detail="No questions found with the given filters for this page."
)
return [dict(question) for question in questions]
@app.get("/questions/{question_id}",
summary="Get a question by ID",
tags=["Questions"],
response_description="Details of the requested question."
)
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer())
async def get_question_by_id(question_id: str, db: aiosqlite.Connection = Depends(get_db)):
"""Get question details based on question ID."""
cursor = await db.execute(
"SELECT * FROM questions WHERE question_id = ?", (question_id,)
)
question = await cursor.fetchone()
if not question:
raise HTTPException(status_code=404, detail="Question not found.")
return dict(question)
@app.get("/{subject}",
summary="List chapters for a subject",
tags=["Subjects & Chapters"],
response_description="List of chapters for a given subject."
)
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer())
async def list_chapters(subject: str, db: aiosqlite.Connection = Depends(get_db)):
"""
Return list of chapters for a given subject along with the count of questions in each chapter.
Also includes the total number of questions in the subject.
"""
cursor = await db.execute(
"SELECT chapter, COUNT(*) as question_count FROM questions WHERE subject = ? GROUP BY chapter",
(subject,),
)
chapters = await cursor.fetchall()
if not chapters:
raise HTTPException(
status_code=404, detail="Subject not found or no chapters available."
)
return {
"subject": subject,
"chapters": [dict(chapter) for chapter in chapters]
}
@app.get("/{subject}/{chapter}",
summary="List topics for a chapter",
tags=["Subjects & Chapters"],
response_description="List of topics for a given chapter."
)
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer())
async def list_topics(subject: str, chapter: str, db: aiosqlite.Connection = Depends(get_db)):
"""
Return list of topics for a given chapter along with the count of questions in each topic.
Also includes the total number of questions in the chapter.
"""
cursor = await db.execute(
"SELECT topic, COUNT(*) as question_count FROM questions WHERE subject = ? AND chapter = ? GROUP BY topic",
(subject, chapter),
)
topics = await cursor.fetchall()
if not topics:
raise HTTPException(
status_code=404, detail="Chapter not found or no topics available."
)
return {
"subject": subject,
"chapter": chapter,
"topics": [dict(topic) for topic in topics]
}
@app.get("/{subject}/{chapter}/{topic}",
summary="List questions for a topic",
tags=["Questions"],
response_description="List of questions for a given topic."
)
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer())
async def list_questions(subject: str, chapter: str, topic: str,
offset: int = Query(0, ge=0, description="Offset for pagination"),
limit: int = Query(1000, ge=1, le=1000, description="Limit for pagination"),
db: aiosqlite.Connection = Depends(get_db)):
"""
Return list of questions for a given topic.
"""
query = "SELECT * FROM questions WHERE subject = ? AND chapter = ? AND topic = ? LIMIT ? OFFSET ?"
params = [subject, chapter, topic, limit, offset]
cursor = await db.execute(query, params)
questions = await cursor.fetchall()
if not questions:
raise HTTPException(
status_code=404, detail="Topic not found or no questions available for this page."
)
return {
"subject": subject,
"chapter": chapter,
"topic": topic,
"questions": [dict(question) for question in questions],
}
@app.post("/generate-test",
summary="Generate a test",
tags=["Test Generation"],
response_description="Generated test with the specified parameters."
)
@cached(ttl=300, cache=SimpleMemoryCache, serializer=JsonSerializer())
async def generate_test(
subjects: Optional[List[str]] = None,
chapters: Optional[List[str]] = None,
topics: Optional[List[str]] = None,
num_questions: int = 10,
total_time: int = 60,
offset: int = Query(0, ge=0, description="Offset for pagination (results may vary)"),
limit: int = Query(1000, ge=1, le=1000, description="Limit the number of questions to select (results may vary)"),
db: aiosqlite.Connection = Depends(get_db)
):
"""
Generate a test with the specified parameters.
"""
query = "SELECT * FROM questions WHERE 1=1"
params = []
if subjects:
query += f" AND subject IN ({','.join(['?'] * len(subjects))})"
params.extend(subjects)
if chapters:
query += f" AND chapter IN ({','.join(['?'] * len(chapters))})"
params.extend(chapters)
if topics:
query += f" AND topic IN ({','.join(['?'] * len(topics))})"
params.extend(topics)
cursor = await db.execute(query, params)
all_matching_questions = await cursor.fetchall()
if not all_matching_questions:
raise HTTPException(
status_code=404, detail="No questions found matching the criteria."
)
start_index = offset
end_index = offset + limit
paginated_questions = all_matching_questions[start_index:end_index]
if not paginated_questions:
return {
"total_questions": 0,
"total_time": total_time,
"questions": []
}
num_available_questions = len(paginated_questions)
num_to_select = min(num_questions, num_available_questions)
selected_questions = random.sample(paginated_questions, num_to_select)
test = {
"total_questions": num_to_select,
"total_time": total_time,
"questions": [dict(q) for q in selected_questions],
}
return test
# Main application runner
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5000, debug=False, workers=6, timeout_keep_alive=60)