first working version

This commit is contained in:
howard
2025-10-22 20:14:31 +08:00
parent c9767b830b
commit 8dc869634e
118 changed files with 22518 additions and 0 deletions

View File

@@ -0,0 +1,15 @@
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "2", "exam_server.wsgi:application"]

View File

@@ -0,0 +1,2 @@
# API app package

View File

@@ -0,0 +1,7 @@
from django.apps import AppConfig
class ApiConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'api'

View File

@@ -0,0 +1,60 @@
"""
Authentication views.
"""
from django.contrib.auth import authenticate, login, logout, get_user_model
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
from .serializers import UserRegistrationSerializer, UserSerializer, LoginSerializer
User = get_user_model()
@api_view(['POST'])
def register(request):
"""Register a new user."""
serializer = UserRegistrationSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
return Response({
'message': 'User created successfully',
'user': UserSerializer(user).data
}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['POST'])
def user_login(request):
"""Login user."""
serializer = LoginSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
username = serializer.validated_data['username']
password = serializer.validated_data['password']
user = authenticate(request, username=username, password=password)
if user is not None:
login(request, user)
return Response({
'message': 'Login successful',
'user': UserSerializer(user).data
})
return Response({'error': 'Invalid credentials'}, status=status.HTTP_401_UNAUTHORIZED)
@api_view(['POST'])
def user_logout(request):
"""Logout user."""
logout(request)
return Response({'message': 'Logout successful'})
@api_view(['GET'])
def current_user(request):
"""Get current logged-in user."""
if request.user.is_authenticated:
return Response(UserSerializer(request.user).data)
return Response({'error': 'Not authenticated'}, status=status.HTTP_401_UNAUTHORIZED)

View File

@@ -0,0 +1,143 @@
"""
Exam history and retake views.
"""
from pathlib import Path
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status
from .storage import storage
from .views import get_user_id
@api_view(['GET'])
def exam_history(request):
"""Get user's exam history with hierarchical folder support."""
try:
user_id = get_user_id(request)
# Get all attempts for this user (recursively search hierarchical structure)
attempts_base = storage.attempts_dir / user_id
history = {} # Use dict to group by exam_id
if attempts_base.exists():
# Recursively find all attempt files
for attempt_file in attempts_base.rglob('*.json'):
try:
attempt_data = storage._read_json(attempt_file)
exam_id = attempt_data.get('examId')
if not exam_id:
continue
# Group by exam_id
if exam_id not in history:
exam = storage.get_exam(exam_id)
history[exam_id] = {
'examId': exam_id,
'examTitle': exam.get('title', 'Unknown') if exam else 'Unknown',
'examSubject': exam.get('subject', 'Unknown') if exam else 'Unknown',
'attempts': [],
'canRetake': True
}
# Add attempt info
history[exam_id]['attempts'].append({
'attemptId': attempt_data['attemptId'],
'status': attempt_data['status'],
'startedAt': attempt_data['startedAt'],
'submittedAt': attempt_data.get('submittedAt'),
'answersCount': len(attempt_data.get('answers', [])),
'score': attempt_data.get('score') # Include score if available
})
except Exception as e:
# Skip invalid attempt files
continue
# Convert to list and sort attempts
history_list = []
for exam_data in history.values():
# Sort attempts by most recent first
exam_data['attempts'].sort(key=lambda x: x['startedAt'], reverse=True)
exam_data['totalAttempts'] = len(exam_data['attempts'])
history_list.append(exam_data)
# Sort exams by most recent attempt
history_list.sort(key=lambda x: x['attempts'][0]['startedAt'] if x['attempts'] else '', reverse=True)
return Response({'history': history_list})
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
@permission_classes([AllowAny])
def reset_exam(request, exam_id):
"""Reset exam to allow retake (creates new attempt)."""
try:
user_id = get_user_id(request)
# Check if exam exists
exam = storage.get_exam(exam_id)
if not exam:
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
# Update manifest to remove from finished (if present)
manifest = storage.get_manifest()
users = manifest.get('users', {})
if user_id in users and exam_id in users[user_id].get('finished', []):
users[user_id]['finished'].remove(exam_id)
storage.save_manifest(manifest)
return Response({'message': f'Exam {exam_id} reset successfully. You can take it again.'})
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def get_attempt_result(request, attempt_id):
"""Get results for a specific attempt with hierarchical folder support."""
try:
user_id = get_user_id(request)
# Recursively search for the output bundle in hierarchical structure
# Pattern: output/{username}/{subject}/{month}/{examId}_{attemptId}.json
# Get username from user_id
from django.contrib.auth import get_user_model
User = get_user_model()
try:
if user_id.isdigit():
user = User.objects.get(id=int(user_id))
username = user.username
else:
username = user_id # guest users
except:
username = user_id
# Search in user's output folder
user_output_dir = storage.output_dir / username
bundle = None
if user_output_dir.exists():
# Recursively find the output file
for output_file in user_output_dir.rglob(f"*_{attempt_id}.json"):
bundle = storage._read_json(output_file)
break
if not bundle:
return Response({'error': 'Results not found'}, status=status.HTTP_404_NOT_FOUND)
# Verify user owns this attempt
if bundle['attempt']['userId'] != user_id:
return Response({'error': 'Unauthorized'}, status=status.HTTP_403_FORBIDDEN)
return Response(bundle)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

View File

@@ -0,0 +1,44 @@
# Generated by Django 4.2.7 on 2025-10-20 10:18
import django.contrib.auth.models
import django.contrib.auth.validators
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('email', models.EmailField(max_length=254, unique=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
options={
'db_table': 'users',
},
managers=[
('objects', django.contrib.auth.models.UserManager()),
],
),
]

View File

@@ -0,0 +1,19 @@
"""
Models for user management.
"""
from django.contrib.auth.models import AbstractUser
from django.db import models
class User(AbstractUser):
"""Extended user model."""
email = models.EmailField(unique=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'users'
def __str__(self):
return self.username

View File

@@ -0,0 +1,142 @@
"""
Automatic scoring utilities.
"""
def can_auto_score(exam):
"""
Check if exam can be automatically scored.
Only exams with ONLY single_choice, multiple_choices, and true_false questions can be auto-scored.
"""
allowed_types = {'single_choice', 'multiple_choices', 'true_false'}
for section in exam.get('sections', []):
for question in section.get('questions', []):
if question.get('type') not in allowed_types:
return False
return True
def calculate_score(exam, attempt):
"""
Calculate score for auto-gradable exams.
Returns (total_score, max_score, by_question_scores) or None if can't auto-score.
Supports:
- single_choice: exact match
- multiple_choices: partial credit with penalty for wrong selections
- true_false: exact match
- "IDK" responses: always score 0
"""
if not can_auto_score(exam):
return None
total_score = 0
max_score = 0
by_question = []
# Create answer lookup
answers_dict = {ans['questionId']: ans['response'] for ans in attempt.get('answers', [])}
for section in exam.get('sections', []):
for question in section.get('questions', []):
question_id = question['id']
question_type = question['type']
points = question.get('points', 0)
correct_answer = question.get('answer')
max_score += points
# Get user's answer
user_answer = answers_dict.get(question_id)
# Score the question
earned = 0
is_correct = False
# Check for "I don't know" response
if user_answer == "IDK" or user_answer == ["IDK"]:
earned = 0
is_correct = False
elif question_type == 'multiple_choices':
# Multiple choices scoring
earned, is_correct = _score_multiple_choices(
user_answer, correct_answer, points, question
)
elif question_type in ['single_choice', 'true_false']:
# Single choice or true/false: exact match
if user_answer is not None and user_answer == correct_answer:
earned = points
is_correct = True
total_score += earned
by_question.append({
'questionId': question_id,
'earned': earned,
'max': points,
'correct': is_correct
})
percentage = (total_score / max_score * 100) if max_score > 0 else 0
return {
'totalScore': total_score,
'maxScore': max_score,
'percentage': round(percentage, 2),
'passed': percentage >= 70, # 70% passing threshold
'byQuestion': by_question
}
def _score_multiple_choices(user_answer, correct_answer, points, question):
"""
Score a multiple_choices question.
Returns (earned_points, is_fully_correct).
Scoring logic:
- If partialCredit is False: all or nothing (need all correct, no extras)
- If partialCredit is True (default):
- Score = (correct_selected - wrong_selected) / total_correct * points
- Minimum score is 0 (no negative scores)
"""
if not user_answer or user_answer == ["IDK"]:
return 0, False
# Ensure lists
if not isinstance(user_answer, list):
user_answer = [user_answer]
if not isinstance(correct_answer, list):
correct_answer = [correct_answer]
user_set = set(user_answer)
correct_set = set(correct_answer)
correct_selected = len(user_set & correct_set) # Intersection
wrong_selected = len(user_set - correct_set) # User selected but not correct
total_correct = len(correct_set)
# Check if fully correct
is_fully_correct = (user_set == correct_set)
# Check if partial credit is allowed
partial_credit = question.get('partialCredit', True)
if not partial_credit:
# All or nothing
return (points if is_fully_correct else 0), is_fully_correct
# Partial credit with penalty
if total_correct == 0:
return 0, False
# Calculate score with penalty for wrong selections
raw_score = (correct_selected - wrong_selected) / total_correct * points
earned = max(0, raw_score) # Don't go below 0
return round(earned, 2), is_fully_correct

View File

@@ -0,0 +1,48 @@
"""
Serializers for API.
"""
from django.contrib.auth import get_user_model
from rest_framework import serializers
User = get_user_model()
class UserRegistrationSerializer(serializers.ModelSerializer):
"""Serializer for user registration."""
password = serializers.CharField(write_only=True, min_length=6)
password_confirm = serializers.CharField(write_only=True, min_length=6)
class Meta:
model = User
fields = ['username', 'email', 'password', 'password_confirm', 'first_name', 'last_name']
def validate(self, data):
if data['password'] != data['password_confirm']:
raise serializers.ValidationError({"password": "Passwords must match"})
return data
def create(self, validated_data):
validated_data.pop('password_confirm')
user = User.objects.create_user(
username=validated_data['username'],
email=validated_data['email'],
password=validated_data['password'],
first_name=validated_data.get('first_name', ''),
last_name=validated_data.get('last_name', '')
)
return user
class UserSerializer(serializers.ModelSerializer):
"""Serializer for user details."""
class Meta:
model = User
fields = ['id', 'username', 'email', 'first_name', 'last_name', 'created_at']
read_only_fields = ['id', 'created_at']
class LoginSerializer(serializers.Serializer):
"""Serializer for login."""
username = serializers.CharField()
password = serializers.CharField(write_only=True)

View File

@@ -0,0 +1,235 @@
"""
File-based storage utilities for exam system.
"""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from django.conf import settings
class StorageManager:
"""Manage file-based storage for exams, attempts, and progress."""
def __init__(self):
self.input_dir = settings.INPUT_DIR
self.attempts_dir = settings.ATTEMPTS_DIR
self.output_dir = settings.OUTPUT_DIR
self.progress_dir = settings.PROGRESS_DIR
self.manifest_file = settings.MANIFEST_FILE
def _read_json(self, path: Path) -> Dict:
"""Read JSON file safely."""
if not path.exists():
return {}
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def _write_json(self, path: Path, data: Dict):
"""Write JSON file atomically."""
path.parent.mkdir(parents=True, exist_ok=True)
temp_path = path.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
temp_path.replace(path)
# Manifest operations
def get_manifest(self) -> Dict:
"""Load manifest.json."""
if not self.manifest_file.exists():
manifest = {"version": "1.0.0", "exams": [], "users": {}}
self._write_json(self.manifest_file, manifest)
return manifest
return self._read_json(self.manifest_file)
def save_manifest(self, manifest: Dict):
"""Save manifest.json."""
self._write_json(self.manifest_file, manifest)
# Exam operations
def list_exams(self, published_only: bool = True) -> List[Dict]:
"""List all exams from manifest."""
manifest = self.get_manifest()
exams = manifest.get('exams', [])
if published_only:
exams = [e for e in exams if e.get('published', False)]
return exams
def get_exam(self, exam_id: str) -> Optional[Dict]:
"""Load exam JSON by ID."""
manifest = self.get_manifest()
exam_info = next((e for e in manifest.get('exams', []) if e['examId'] == exam_id), None)
if not exam_info:
return None
# Support both old flat structure and new hierarchical structure
exam_path = self.input_dir / exam_info.get('path', f"{exam_id}.json")
if not exam_path.exists():
# Fallback to flat structure
exam_path = self.input_dir / f"{exam_id}.json"
if not exam_path.exists():
return None
return self._read_json(exam_path)
def is_exam_finished(self, user_id: str, exam_id: str, allow_retake: bool = False) -> bool:
"""Check if user has finished exam."""
if allow_retake:
return False
manifest = self.get_manifest()
users = manifest.get('users', {})
user_data = users.get(user_id, {})
return exam_id in user_data.get('finished', [])
# Attempt operations
def get_attempt_path(self, user_id: str, exam_id: str, attempt_id: str, subject: str = None, month: str = None) -> Path:
"""Get path for attempt file with hierarchical structure."""
if subject and month:
# New hierarchical structure
return self.attempts_dir / user_id / subject / month / exam_id / f"{attempt_id}.json"
else:
# Old flat structure (backward compatibility)
return self.attempts_dir / user_id / exam_id / f"{attempt_id}.json"
def get_active_attempt(self, user_id: str, exam_id: str) -> Optional[Dict]:
"""Get active attempt for user/exam."""
attempt_dir = self.attempts_dir / user_id / exam_id
if not attempt_dir.exists():
return None
# Find most recent in_progress attempt
attempts = []
for attempt_file in attempt_dir.glob('*.json'):
attempt_data = self._read_json(attempt_file)
if attempt_data.get('status') == 'in_progress':
attempts.append(attempt_data)
if not attempts:
return None
# Return most recent
attempts.sort(key=lambda x: x.get('updatedAt', ''), reverse=True)
return attempts[0]
def create_attempt(self, user_id: str, exam_id: str, exam: Dict) -> Dict:
"""Create new attempt with hierarchical structure."""
timestamp = datetime.utcnow()
timestamp_str = timestamp.isoformat() + 'Z'
month_str = timestamp.strftime('%Y-%m')
attempt_id = f"attempt-{timestamp.strftime('%Y%m%d-%H%M%S')}"
# Get subject and month from exam
subject = exam.get('subject', 'unknown')
attempt = {
"attemptId": attempt_id,
"userId": user_id,
"examId": exam_id,
"subject": subject,
"month": month_str,
"examVersion": exam.get('metadata', {}).get('version', '1.0.0'),
"status": "in_progress",
"startedAt": timestamp_str,
"updatedAt": timestamp_str,
"answers": []
}
attempt_path = self.get_attempt_path(user_id, exam_id, attempt_id, subject, month_str)
self._write_json(attempt_path, attempt)
# Update manifest
manifest = self.get_manifest()
users = manifest.setdefault('users', {})
user_data = users.setdefault(user_id, {'active': [], 'finished': []})
if exam_id not in user_data['active']:
user_data['active'].append(exam_id)
self.save_manifest(manifest)
return attempt
def save_attempt(self, attempt: Dict):
"""Save attempt (autosave) with hierarchical structure."""
user_id = attempt['userId']
exam_id = attempt['examId']
attempt_id = attempt['attemptId']
subject = attempt.get('subject')
month = attempt.get('month')
attempt['updatedAt'] = datetime.utcnow().isoformat() + 'Z'
attempt_path = self.get_attempt_path(user_id, exam_id, attempt_id, subject, month)
self._write_json(attempt_path, attempt)
def submit_attempt(self, attempt: Dict, exam: Dict) -> str:
"""Submit attempt and create output bundle with hierarchical structure."""
attempt['status'] = 'submitted'
attempt['submittedAt'] = datetime.utcnow().isoformat() + 'Z'
self.save_attempt(attempt)
# Create output bundle
bundle = {
"exam": exam,
"attempt": attempt
}
# Get username from user_id
from django.contrib.auth import get_user_model
User = get_user_model()
try:
if attempt['userId'].isdigit():
user = User.objects.get(id=int(attempt['userId']))
username = user.username
else:
username = attempt['userId'] # guest users
except:
username = attempt['userId']
# Organize by username/subject/month
subject = attempt.get('subject', exam.get('subject', 'unknown'))
month = attempt.get('month', datetime.utcnow().strftime('%Y-%m'))
output_path = self.output_dir / username / subject / month / f"{exam['examId']}_{attempt['attemptId']}.json"
self._write_json(output_path, bundle)
# Mark as finished
attempt['status'] = 'finished'
self.save_attempt(attempt)
# Update manifest
manifest = self.get_manifest()
users = manifest.setdefault('users', {})
user_data = users.setdefault(attempt['userId'], {'active': [], 'finished': []})
if exam['examId'] in user_data['active']:
user_data['active'].remove(exam['examId'])
if exam['examId'] not in user_data['finished']:
user_data['finished'].append(exam['examId'])
self.save_manifest(manifest)
return str(output_path)
# Progress operations
def get_progress(self, user_id: str) -> Dict:
"""Get user progress."""
progress_path = self.progress_dir / f"{user_id}.json"
if not progress_path.exists():
return {"userId": user_id, "exams": {}}
return self._read_json(progress_path)
def update_progress(self, user_id: str, exam_id: str, percent: float):
"""Update progress for exam."""
progress = self.get_progress(user_id)
progress['exams'][exam_id] = {
"percentComplete": percent,
"lastSavedAt": datetime.utcnow().isoformat() + 'Z'
}
progress_path = self.progress_dir / f"{user_id}.json"
self._write_json(progress_path, progress)
# Global storage instance
storage = StorageManager()

View File

@@ -0,0 +1,33 @@
"""
API URL configuration.
"""
from django.urls import path
from . import views, auth_views, exam_history_views
urlpatterns = [
# Health
path('health/', views.health_check, name='health_check'),
# Authentication
path('auth/register/', auth_views.register, name='register'),
path('auth/login/', auth_views.user_login, name='login'),
path('auth/logout/', auth_views.user_logout, name='logout'),
path('auth/me/', auth_views.current_user, name='current_user'),
# Exams
path('exams/', views.list_exams, name='list_exams'),
path('exams/<str:exam_id>/', views.get_exam, name='get_exam'),
path('exams/<str:exam_id>/attempt/', views.start_or_resume_attempt, name='start_or_resume_attempt'),
path('exams/<str:exam_id>/reset/', exam_history_views.reset_exam, name='reset_exam'),
# Attempts
path('attempts/<str:attempt_id>/', views.get_attempt, name='get_attempt'),
path('attempts/<str:attempt_id>/autosave/', views.autosave_attempt, name='autosave_attempt'),
path('attempts/<str:attempt_id>/submit/', views.submit_attempt, name='submit_attempt'),
path('attempts/<str:attempt_id>/result/', exam_history_views.get_attempt_result, name='get_attempt_result'),
# Progress & History
path('progress/me/', views.get_progress, name='get_progress'),
path('history/me/', exam_history_views.exam_history, name='exam_history'),
]

View File

@@ -0,0 +1,379 @@
"""
API views for exam system.
"""
from pathlib import Path
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
from .storage import storage
from .scoring import can_auto_score, calculate_score
def get_user_id(request):
"""Get user ID from session or authenticated user."""
if request.user.is_authenticated:
return str(request.user.id)
# Fallback for unauthenticated sessions (backward compatibility)
if 'user_id' not in request.session:
request.session['user_id'] = f"guest_{request.session.session_key or 'default'}"
return request.session['user_id']
def find_attempt_file(user_id, attempt_id):
"""Recursively search for attempt file in user's folder."""
user_attempts_base = storage.attempts_dir / user_id
if not user_attempts_base.exists():
return None, None
def search_recursive(base_path):
for item in base_path.iterdir():
if item.is_file() and item.name == f"{attempt_id}.json":
# Get exam_id from parent folder
exam_id = item.parent.name
return item, exam_id
elif item.is_dir():
found_path, found_exam = search_recursive(item)
if found_path:
return found_path, found_exam
return None, None
return search_recursive(user_attempts_base)
@api_view(['GET'])
def list_exams(request):
"""List all published exams with full details."""
try:
exams = storage.list_exams(published_only=True)
user_id = get_user_id(request)
# Add status and full details to each exam
exams_with_details = []
for exam_info in exams:
# Load full exam JSON
full_exam = storage.get_exam(exam_info['examId'])
if not full_exam:
continue
# Merge manifest info with full exam
exam_data = {
**full_exam, # Full exam JSON (includes subject, difficulty, etc.)
'published': exam_info.get('published', True),
'version': exam_info.get('version', '1.0.0')
}
# Add status - check actual attempt files for more reliable status
# For now, check if there are any finished attempts for this exam (any user)
# This is a temporary solution until proper authentication is set up
has_finished_attempt = False
has_active_attempt = False
# Check all user directories for attempts
attempts_base = storage.attempts_dir
if attempts_base.exists():
for user_dir in attempts_base.iterdir():
if user_dir.is_dir():
for attempt_file in user_dir.rglob('*.json'):
try:
attempt = storage._read_json(attempt_file)
if attempt.get('examId') == exam_info['examId']:
if attempt.get('status') == 'finished':
has_finished_attempt = True
elif attempt.get('status') in ['in_progress', 'draft']:
has_active_attempt = True
except Exception:
continue
# Determine status
if has_finished_attempt:
exam_data['status'] = 'finished'
elif has_active_attempt:
exam_data['status'] = 'in_progress'
else:
exam_data['status'] = 'available'
# Add score if finished
if exam_data['status'] == 'finished':
# Try to find the latest finished attempt and its score from any user
latest_score = None
latest_time = None
latest_attempt_id = None
latest_username = None
# Search all user directories for the latest finished attempt
attempts_base = storage.attempts_dir
if attempts_base.exists():
for user_dir in attempts_base.iterdir():
if user_dir.is_dir():
for attempt_file in user_dir.rglob('*.json'):
try:
attempt = storage._read_json(attempt_file)
if (attempt.get('examId') == exam_info['examId'] and
attempt.get('status') == 'finished'):
attempt_time = attempt.get('submittedAt', '')
if latest_time is None or attempt_time > latest_time:
latest_time = attempt_time
latest_attempt_id = attempt.get('attemptId')
latest_username = user_dir.name
except Exception:
continue
# Now look for the score in the output bundle
if latest_attempt_id and latest_username:
# Try to get username from user ID
username = latest_username
try:
if latest_username.isdigit():
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=int(latest_username))
username = user.username
except:
pass
# Search in user's output folder for the bundle
user_output_dir = storage.output_dir / username
if user_output_dir.exists():
for output_file in user_output_dir.rglob(f"*_{latest_attempt_id}.json"):
try:
bundle = storage._read_json(output_file)
if bundle.get('attempt', {}).get('score'):
latest_score = bundle['attempt']['score']
break
except Exception:
continue
if latest_score:
exam_data['lastScore'] = latest_score
exams_with_details.append(exam_data)
# Organize exams by subject and status
organized_exams = {
'bySubject': {},
'byStatus': {
'available': [],
'finished': [],
'passed': [],
'notPassed': []
},
'all': exams_with_details
}
# Group by subject
for exam in exams_with_details:
subject = exam.get('subject', 'Other')
# Normalize subject names
subject_map = {
'python': 'Python',
'cpp': 'C++',
'linear_algebra': 'Linear Algebra',
'javascript': 'JavaScript',
'java': 'Java',
'sql': 'SQL'
}
normalized_subject = subject_map.get(subject.lower(), subject)
if normalized_subject not in organized_exams['bySubject']:
organized_exams['bySubject'][normalized_subject] = []
organized_exams['bySubject'][normalized_subject].append(exam)
# Group by status
for exam in exams_with_details:
status = exam.get('status', 'available')
if status in organized_exams['byStatus']:
organized_exams['byStatus'][status].append(exam)
# Also categorize finished exams by pass/fail
if status == 'finished' and exam.get('lastScore'):
if exam['lastScore'].get('passed', False):
organized_exams['byStatus']['passed'].append(exam)
else:
organized_exams['byStatus']['notPassed'].append(exam)
return Response({
'exams': exams_with_details,
'organized': organized_exams,
'subjects': list(organized_exams['bySubject'].keys()),
'summary': {
'total': len(exams_with_details),
'available': len(organized_exams['byStatus']['available']),
'finished': len(organized_exams['byStatus']['finished']),
'passed': len(organized_exams['byStatus']['passed']),
'notPassed': len(organized_exams['byStatus']['notPassed'])
}
})
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def get_exam(request, exam_id):
"""Get exam details by ID."""
try:
exam = storage.get_exam(exam_id)
if not exam:
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
user_id = get_user_id(request)
if storage.is_exam_finished(user_id, exam_id):
return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED)
return Response(exam)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
def start_or_resume_attempt(request, exam_id):
"""Start new attempt or resume existing one."""
try:
user_id = get_user_id(request)
# Check if exam exists
exam = storage.get_exam(exam_id)
if not exam:
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
# Check if already finished
if storage.is_exam_finished(user_id, exam_id):
return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED)
# Check for active attempt
attempt = storage.get_active_attempt(user_id, exam_id)
if attempt:
return Response(attempt)
# Create new attempt
attempt = storage.create_attempt(user_id, exam_id, exam)
return Response(attempt, status=status.HTTP_201_CREATED)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def get_attempt(request, attempt_id):
"""Get attempt by ID."""
try:
user_id = get_user_id(request)
# Find attempt file using helper
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
if not attempt_path:
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
attempt = storage._read_json(attempt_path)
return Response(attempt)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['PUT'])
def autosave_attempt(request, attempt_id):
"""Autosave attempt answers."""
try:
user_id = get_user_id(request)
data = request.data
# Find attempt file using helper
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
if not attempt_path:
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
# Load attempt
attempt = storage._read_json(attempt_path)
# Update answers
if 'answers' in data:
attempt['answers'] = data['answers']
# Save
storage.save_attempt(attempt)
# Update progress
exam = storage.get_exam(exam_id)
if exam:
total_questions = sum(len(section['questions']) for section in exam.get('sections', []))
answered = len(attempt['answers'])
percent = (answered / total_questions * 100) if total_questions > 0 else 0
storage.update_progress(user_id, exam_id, percent)
return Response({'updatedAt': attempt['updatedAt']})
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
def submit_attempt(request, attempt_id):
"""Submit attempt and create output bundle."""
try:
user_id = get_user_id(request)
# Find attempt file using helper
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
if not attempt_path:
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
attempt = storage._read_json(attempt_path)
if attempt['status'] != 'in_progress':
return Response({'error': 'Attempt already submitted'}, status=status.HTTP_400_BAD_REQUEST)
# Load exam
exam = storage.get_exam(exam_id)
if not exam:
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
# Calculate score if auto-scorable
score_result = None
if can_auto_score(exam):
score_result = calculate_score(exam, attempt)
# Add score to attempt
attempt['score'] = score_result
# Submit and create bundle
output_path = storage.submit_attempt(attempt, exam)
response_data = {
'message': 'Exam submitted successfully',
'outputPath': output_path,
'attemptId': attempt_id
}
# Include score in response if available
if score_result:
response_data['score'] = score_result
response_data['autoScored'] = True
else:
response_data['autoScored'] = False
response_data['message'] = 'Exam submitted successfully. Manual grading required for some questions.'
return Response(response_data)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def get_progress(request):
"""Get current user's progress."""
try:
user_id = get_user_id(request)
progress = storage.get_progress(user_id)
return Response(progress)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def health_check(request):
"""Health check endpoint."""
return Response({'status': 'ok', 'service': 'exam_server'})

View File

@@ -0,0 +1,12 @@
DEBUG=False
SECRET_KEY=change-this-in-production
ALLOWED_HOSTS=localhost,127.0.0.1,exam_server
CORS_ALLOWED_ORIGINS=http://localhost,http://localhost:4200
# Data folder paths (inside container)
INPUT_DIR=/data/input
ATTEMPTS_DIR=/data/attempts
OUTPUT_DIR=/data/output
PROGRESS_DIR=/data/progress
MANIFEST_FILE=/data/manifest.json

View File

@@ -0,0 +1,2 @@
# Django project package

View File

@@ -0,0 +1,130 @@
"""
Django settings for exam_server project.
"""
import os
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Build paths inside the project
BASE_DIR = Path(__file__).resolve().parent.parent
# Security settings
SECRET_KEY = os.getenv('SECRET_KEY', 'django-insecure-dev-key-change-in-production')
DEBUG = os.getenv('DEBUG', 'True') == 'True'
ALLOWED_HOSTS = os.getenv('ALLOWED_HOSTS', 'localhost,127.0.0.1').split(',')
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'corsheaders',
'api',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'exam_server.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'exam_server.wsgi.application'
# Database (using SQLite for simplicity - sessions and users)
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': Path('/app/db/db.sqlite3'), # Persistent volume
}
}
# Password validation
AUTH_PASSWORD_VALIDATORS = [
{'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'},
{'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator'},
{'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'},
{'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'},
]
# Internationalization
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files
STATIC_URL = 'static/'
STATIC_ROOT = BASE_DIR / 'staticfiles'
# Default primary key field type
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# Custom user model
AUTH_USER_MODEL = 'api.User'
# CORS settings
CORS_ALLOWED_ORIGINS = os.getenv('CORS_ALLOWED_ORIGINS', 'http://localhost:4200').split(',')
CORS_ALLOW_CREDENTIALS = True
# REST Framework
REST_FRAMEWORK = {
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
],
'DEFAULT_PARSER_CLASSES': [
'rest_framework.parsers.JSONParser',
],
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
],
}
# CSRF settings
CSRF_TRUSTED_ORIGINS = ['http://localhost', 'http://localhost:4200', 'http://localhost:80']
CSRF_COOKIE_HTTPONLY = False # Allow JavaScript to read CSRF token
# File storage paths
INPUT_DIR = Path(os.getenv('INPUT_DIR', '/data/input'))
ATTEMPTS_DIR = Path(os.getenv('ATTEMPTS_DIR', '/data/attempts'))
OUTPUT_DIR = Path(os.getenv('OUTPUT_DIR', '/data/output'))
PROGRESS_DIR = Path(os.getenv('PROGRESS_DIR', '/data/progress'))
MANIFEST_FILE = Path(os.getenv('MANIFEST_FILE', '/data/manifest.json'))
# Ensure directories exist
for dir_path in [INPUT_DIR, ATTEMPTS_DIR, OUTPUT_DIR, PROGRESS_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
# Session settings
SESSION_COOKIE_AGE = 86400 # 24 hours
SESSION_SAVE_EVERY_REQUEST = True

View File

@@ -0,0 +1,11 @@
"""
URL configuration for exam_server project.
"""
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('api.urls')),
]

View File

@@ -0,0 +1,9 @@
"""
WSGI config for exam_server project.
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'exam_server.settings')
application = get_wsgi_application()

View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'exam_server.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,6 @@
Django==4.2.7
djangorestframework==3.14.0
django-cors-headers==4.3.0
gunicorn==21.2.0
python-dotenv==1.0.0