|
|
|
""" |
|
Create a grading script based on midterm2a-submissions.json or from db which will be filename on the command line. The script should: |
|
1. input the json file for student submissions |
|
2. get student student_code and problem_id, if there are multiple entry for the same student_code and problem_id, only the last entry will be graded |
|
3. for each student_code , problem_id tuple, get the correct answer and problem_description for the problem_id with function get_correct_answer(problem_id) |
|
4. grade the student submission with function grade_submission(student_code, problem_id, student_answer, correct_answer) which returns a tuple (score, feedback) |
|
5. output the grading result to a markdown file: |
|
- for each student_code, problem_id tuple, output the problem_description, student answer, correct answer, score, feedback |
|
- for each student_code, output the total score and total feedback |
|
""" |
|
|
|
""" |
|
This is the sample content of the submission json whose file name to be supplied from command line: |
|
[ |
|
{ |
|
"problem_id": "problem_1_21_dutch_national_flag", |
|
"session": "MIDTERM2a", |
|
"email": "kimnguyen@stu.feitian.edu", |
|
"name": "Kim Nguyen", |
|
"hint_requested": false, |
|
"student_code": "def sort_colors(nums: list[int]) -> None:\n # Initialize pointers\n low = 0\n mid = 0\n high = len(nums) - 1\n \n # TODO: Implement the Dutch National Flag algorithm\n while mid <= high:\n # TODO: Depending on the value at nums[mid], adjust pointers and swap as necessary\n pass", |
|
"timestamp": { |
|
"$date": "2024-11-07T14:31:05.366Z" |
|
} |
|
} |
|
] |
|
""" |
|
|
|
import json |
|
import sys |
|
from typing import Dict, Tuple, List |
|
from collections import defaultdict |
|
from datetime import datetime |
|
import sys, os |
|
from grading_utils import grade_submission, Grading_rubric |
|
from app_utils import (load_problems, SESSION_ID, SESSION_TITLE, logger) |
|
from utils import get_full_traceback |
|
|
|
problemsDB = {problem['id']: problem for problem in load_problems()} |
|
Grading_rubric_path = os.path.join(SESSION_ID, "grading_rubric.json") |
|
output_dir = os.path.join("grading_reports",SESSION_ID) |
|
if os.path.exists(Grading_rubric_path): |
|
with open(Grading_rubric_path, 'r') as f: |
|
Grading_rubric.update(json.load(f)) |
|
print("Grading rubric loaded successfully.") |
|
|
|
def get_correct_answer(problem_id: str) -> Tuple[str, str]: |
|
"""To get correct answer and problem description.""" |
|
return problemsDB[problem_id]['Solution_Code'], problemsDB[problem_id]['Problem_markdown'] |
|
|
|
def process_submissions(submissions: List[dict]) -> Dict[str, Dict[str, dict]]: |
|
"""Process submissions and keep only the latest submission for each student-problem pair.""" |
|
student_submissions = defaultdict(dict) |
|
|
|
for submission in submissions: |
|
name = submission['name'] |
|
|
|
problem_id = submission['problem_id'] |
|
|
|
if ((problem_id not in student_submissions[name]) or |
|
(submission['timestamp']['$date'] > |
|
student_submissions[name][problem_id]['timestamp']['$date'])): |
|
student_submissions[name][problem_id] = submission |
|
|
|
return student_submissions |
|
|
|
def generate_markdown_report(student_submissions: Dict[str, Dict[str, dict]], output_dir: str = "grading_reports"): |
|
"""Generate individual markdown reports for each student.""" |
|
try: |
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
for student_name, problems in student_submissions.items(): |
|
|
|
filename = f"{student_name.replace(' ', '_')}.qmd" |
|
filepath = os.path.join(output_dir, filename) |
|
|
|
header = f"""--- |
|
title: "{SESSION_TITLE}" |
|
subtitle: {student_name} |
|
format: pdf |
|
--- |
|
""" |
|
|
|
with open(filepath, 'w') as f: |
|
|
|
total_score = 0 |
|
problem_count = 0 |
|
f.write(f"{header}\n\n") |
|
for problem_id, submission in problems.items(): |
|
if submission['session'] != SESSION_ID: |
|
continue |
|
correct_answer, problem_description = get_correct_answer( problem_id) |
|
logger.info(f"Grading {student_name} for problem: {problem_id}") |
|
problem_count += 1 |
|
score, feedback, rubric = grade_submission( |
|
student_name, |
|
problem_id, |
|
problem_description, |
|
submission['student_code'], |
|
correct_answer |
|
) |
|
if rubric and str(rubric) != "None" and not Grading_rubric[problem_id]: |
|
Grading_rubric[problem_id] = rubric |
|
total_score += score |
|
|
|
f.write(f"## Problem: {problem_id}\n\n") |
|
f.write(f"**Description:** {problem_description}\n\n") |
|
f.write("**Your Answer:**\n```python\n{}\n```\n\n".format(submission['student_code'])) |
|
f.write("**Correct Solution:**\n```python\n{}\n```\n\n".format(correct_answer)) |
|
f.write(f"**Rubric:**\n{Grading_rubric[problem_id]}\n\n") |
|
f.write(f"**Score:** {score:.2f}\n\n") |
|
f.write(f"**Feedback:** {feedback}\n\n") |
|
|
|
f.write(f"## Summary\n") |
|
f.write(f"Total Score: {total_score:.2f}\n") |
|
if not os.path.exists(Grading_rubric_path): |
|
with open(Grading_rubric_path, 'w') as f: |
|
json.dump(Grading_rubric, f, indent=4) |
|
print("Grading rubric saved successfully.") |
|
print(f"Report generated for {student_name} with total score: {total_score:.2f}") |
|
|
|
|
|
|
|
print(f"Reports generated successfully in directory: {output_dir}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating reports: {e}\n{get_full_traceback(e)}") |
|
sys.exit(1) |
|
|
|
def main(): |
|
if len(sys.argv) != 2: |
|
print("Usage: python script.py <submissions_file.json>") |
|
sys.exit(1) |
|
|
|
try: |
|
with open(sys.argv[1], 'r') as f: |
|
submissions = json.load(f) |
|
except Exception as e: |
|
print(f"Error reading submissions file: {e}") |
|
sys.exit(1) |
|
|
|
|
|
student_submissions = process_submissions(submissions) |
|
generate_markdown_report(student_submissions, output_dir=output_dir) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |