Files
2025-10-22 20:14:31 +08:00

380 lines
15 KiB
Python

"""
API views for exam system.
"""
from pathlib import Path
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
from .storage import storage
from .scoring import can_auto_score, calculate_score
def get_user_id(request):
"""Get user ID from session or authenticated user."""
if request.user.is_authenticated:
return str(request.user.id)
# Fallback for unauthenticated sessions (backward compatibility)
if 'user_id' not in request.session:
request.session['user_id'] = f"guest_{request.session.session_key or 'default'}"
return request.session['user_id']
def find_attempt_file(user_id, attempt_id):
"""Recursively search for attempt file in user's folder."""
user_attempts_base = storage.attempts_dir / user_id
if not user_attempts_base.exists():
return None, None
def search_recursive(base_path):
for item in base_path.iterdir():
if item.is_file() and item.name == f"{attempt_id}.json":
# Get exam_id from parent folder
exam_id = item.parent.name
return item, exam_id
elif item.is_dir():
found_path, found_exam = search_recursive(item)
if found_path:
return found_path, found_exam
return None, None
return search_recursive(user_attempts_base)
@api_view(['GET'])
def list_exams(request):
"""List all published exams with full details."""
try:
exams = storage.list_exams(published_only=True)
user_id = get_user_id(request)
# Add status and full details to each exam
exams_with_details = []
for exam_info in exams:
# Load full exam JSON
full_exam = storage.get_exam(exam_info['examId'])
if not full_exam:
continue
# Merge manifest info with full exam
exam_data = {
**full_exam, # Full exam JSON (includes subject, difficulty, etc.)
'published': exam_info.get('published', True),
'version': exam_info.get('version', '1.0.0')
}
# Add status - check actual attempt files for more reliable status
# For now, check if there are any finished attempts for this exam (any user)
# This is a temporary solution until proper authentication is set up
has_finished_attempt = False
has_active_attempt = False
# Check all user directories for attempts
attempts_base = storage.attempts_dir
if attempts_base.exists():
for user_dir in attempts_base.iterdir():
if user_dir.is_dir():
for attempt_file in user_dir.rglob('*.json'):
try:
attempt = storage._read_json(attempt_file)
if attempt.get('examId') == exam_info['examId']:
if attempt.get('status') == 'finished':
has_finished_attempt = True
elif attempt.get('status') in ['in_progress', 'draft']:
has_active_attempt = True
except Exception:
continue
# Determine status
if has_finished_attempt:
exam_data['status'] = 'finished'
elif has_active_attempt:
exam_data['status'] = 'in_progress'
else:
exam_data['status'] = 'available'
# Add score if finished
if exam_data['status'] == 'finished':
# Try to find the latest finished attempt and its score from any user
latest_score = None
latest_time = None
latest_attempt_id = None
latest_username = None
# Search all user directories for the latest finished attempt
attempts_base = storage.attempts_dir
if attempts_base.exists():
for user_dir in attempts_base.iterdir():
if user_dir.is_dir():
for attempt_file in user_dir.rglob('*.json'):
try:
attempt = storage._read_json(attempt_file)
if (attempt.get('examId') == exam_info['examId'] and
attempt.get('status') == 'finished'):
attempt_time = attempt.get('submittedAt', '')
if latest_time is None or attempt_time > latest_time:
latest_time = attempt_time
latest_attempt_id = attempt.get('attemptId')
latest_username = user_dir.name
except Exception:
continue
# Now look for the score in the output bundle
if latest_attempt_id and latest_username:
# Try to get username from user ID
username = latest_username
try:
if latest_username.isdigit():
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=int(latest_username))
username = user.username
except:
pass
# Search in user's output folder for the bundle
user_output_dir = storage.output_dir / username
if user_output_dir.exists():
for output_file in user_output_dir.rglob(f"*_{latest_attempt_id}.json"):
try:
bundle = storage._read_json(output_file)
if bundle.get('attempt', {}).get('score'):
latest_score = bundle['attempt']['score']
break
except Exception:
continue
if latest_score:
exam_data['lastScore'] = latest_score
exams_with_details.append(exam_data)
# Organize exams by subject and status
organized_exams = {
'bySubject': {},
'byStatus': {
'available': [],
'finished': [],
'passed': [],
'notPassed': []
},
'all': exams_with_details
}
# Group by subject
for exam in exams_with_details:
subject = exam.get('subject', 'Other')
# Normalize subject names
subject_map = {
'python': 'Python',
'cpp': 'C++',
'linear_algebra': 'Linear Algebra',
'javascript': 'JavaScript',
'java': 'Java',
'sql': 'SQL'
}
normalized_subject = subject_map.get(subject.lower(), subject)
if normalized_subject not in organized_exams['bySubject']:
organized_exams['bySubject'][normalized_subject] = []
organized_exams['bySubject'][normalized_subject].append(exam)
# Group by status
for exam in exams_with_details:
status = exam.get('status', 'available')
if status in organized_exams['byStatus']:
organized_exams['byStatus'][status].append(exam)
# Also categorize finished exams by pass/fail
if status == 'finished' and exam.get('lastScore'):
if exam['lastScore'].get('passed', False):
organized_exams['byStatus']['passed'].append(exam)
else:
organized_exams['byStatus']['notPassed'].append(exam)
return Response({
'exams': exams_with_details,
'organized': organized_exams,
'subjects': list(organized_exams['bySubject'].keys()),
'summary': {
'total': len(exams_with_details),
'available': len(organized_exams['byStatus']['available']),
'finished': len(organized_exams['byStatus']['finished']),
'passed': len(organized_exams['byStatus']['passed']),
'notPassed': len(organized_exams['byStatus']['notPassed'])
}
})
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def get_exam(request, exam_id):
"""Get exam details by ID."""
try:
exam = storage.get_exam(exam_id)
if not exam:
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
user_id = get_user_id(request)
if storage.is_exam_finished(user_id, exam_id):
return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED)
return Response(exam)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
def start_or_resume_attempt(request, exam_id):
"""Start new attempt or resume existing one."""
try:
user_id = get_user_id(request)
# Check if exam exists
exam = storage.get_exam(exam_id)
if not exam:
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
# Check if already finished
if storage.is_exam_finished(user_id, exam_id):
return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED)
# Check for active attempt
attempt = storage.get_active_attempt(user_id, exam_id)
if attempt:
return Response(attempt)
# Create new attempt
attempt = storage.create_attempt(user_id, exam_id, exam)
return Response(attempt, status=status.HTTP_201_CREATED)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def get_attempt(request, attempt_id):
"""Get attempt by ID."""
try:
user_id = get_user_id(request)
# Find attempt file using helper
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
if not attempt_path:
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
attempt = storage._read_json(attempt_path)
return Response(attempt)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['PUT'])
def autosave_attempt(request, attempt_id):
"""Autosave attempt answers."""
try:
user_id = get_user_id(request)
data = request.data
# Find attempt file using helper
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
if not attempt_path:
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
# Load attempt
attempt = storage._read_json(attempt_path)
# Update answers
if 'answers' in data:
attempt['answers'] = data['answers']
# Save
storage.save_attempt(attempt)
# Update progress
exam = storage.get_exam(exam_id)
if exam:
total_questions = sum(len(section['questions']) for section in exam.get('sections', []))
answered = len(attempt['answers'])
percent = (answered / total_questions * 100) if total_questions > 0 else 0
storage.update_progress(user_id, exam_id, percent)
return Response({'updatedAt': attempt['updatedAt']})
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST'])
def submit_attempt(request, attempt_id):
"""Submit attempt and create output bundle."""
try:
user_id = get_user_id(request)
# Find attempt file using helper
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
if not attempt_path:
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
attempt = storage._read_json(attempt_path)
if attempt['status'] != 'in_progress':
return Response({'error': 'Attempt already submitted'}, status=status.HTTP_400_BAD_REQUEST)
# Load exam
exam = storage.get_exam(exam_id)
if not exam:
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
# Calculate score if auto-scorable
score_result = None
if can_auto_score(exam):
score_result = calculate_score(exam, attempt)
# Add score to attempt
attempt['score'] = score_result
# Submit and create bundle
output_path = storage.submit_attempt(attempt, exam)
response_data = {
'message': 'Exam submitted successfully',
'outputPath': output_path,
'attemptId': attempt_id
}
# Include score in response if available
if score_result:
response_data['score'] = score_result
response_data['autoScored'] = True
else:
response_data['autoScored'] = False
response_data['message'] = 'Exam submitted successfully. Manual grading required for some questions.'
return Response(response_data)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def get_progress(request):
"""Get current user's progress."""
try:
user_id = get_user_id(request)
progress = storage.get_progress(user_id)
return Response(progress)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def health_check(request):
"""Health check endpoint."""
return Response({'status': 'ok', 'service': 'exam_server'})