380 lines
15 KiB
Python
380 lines
15 KiB
Python
"""
|
|
API views for exam system.
|
|
"""
|
|
from pathlib import Path
|
|
from rest_framework.decorators import api_view
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
from .storage import storage
|
|
from .scoring import can_auto_score, calculate_score
|
|
|
|
|
|
def get_user_id(request):
|
|
"""Get user ID from session or authenticated user."""
|
|
if request.user.is_authenticated:
|
|
return str(request.user.id)
|
|
|
|
# Fallback for unauthenticated sessions (backward compatibility)
|
|
if 'user_id' not in request.session:
|
|
request.session['user_id'] = f"guest_{request.session.session_key or 'default'}"
|
|
return request.session['user_id']
|
|
|
|
|
|
def find_attempt_file(user_id, attempt_id):
|
|
"""Recursively search for attempt file in user's folder."""
|
|
user_attempts_base = storage.attempts_dir / user_id
|
|
if not user_attempts_base.exists():
|
|
return None, None
|
|
|
|
def search_recursive(base_path):
|
|
for item in base_path.iterdir():
|
|
if item.is_file() and item.name == f"{attempt_id}.json":
|
|
# Get exam_id from parent folder
|
|
exam_id = item.parent.name
|
|
return item, exam_id
|
|
elif item.is_dir():
|
|
found_path, found_exam = search_recursive(item)
|
|
if found_path:
|
|
return found_path, found_exam
|
|
return None, None
|
|
|
|
return search_recursive(user_attempts_base)
|
|
|
|
|
|
@api_view(['GET'])
|
|
def list_exams(request):
|
|
"""List all published exams with full details."""
|
|
try:
|
|
exams = storage.list_exams(published_only=True)
|
|
user_id = get_user_id(request)
|
|
|
|
# Add status and full details to each exam
|
|
exams_with_details = []
|
|
for exam_info in exams:
|
|
# Load full exam JSON
|
|
full_exam = storage.get_exam(exam_info['examId'])
|
|
if not full_exam:
|
|
continue
|
|
|
|
# Merge manifest info with full exam
|
|
exam_data = {
|
|
**full_exam, # Full exam JSON (includes subject, difficulty, etc.)
|
|
'published': exam_info.get('published', True),
|
|
'version': exam_info.get('version', '1.0.0')
|
|
}
|
|
|
|
# Add status - check actual attempt files for more reliable status
|
|
# For now, check if there are any finished attempts for this exam (any user)
|
|
# This is a temporary solution until proper authentication is set up
|
|
has_finished_attempt = False
|
|
has_active_attempt = False
|
|
|
|
# Check all user directories for attempts
|
|
attempts_base = storage.attempts_dir
|
|
if attempts_base.exists():
|
|
for user_dir in attempts_base.iterdir():
|
|
if user_dir.is_dir():
|
|
for attempt_file in user_dir.rglob('*.json'):
|
|
try:
|
|
attempt = storage._read_json(attempt_file)
|
|
if attempt.get('examId') == exam_info['examId']:
|
|
if attempt.get('status') == 'finished':
|
|
has_finished_attempt = True
|
|
elif attempt.get('status') in ['in_progress', 'draft']:
|
|
has_active_attempt = True
|
|
except Exception:
|
|
continue
|
|
|
|
# Determine status
|
|
if has_finished_attempt:
|
|
exam_data['status'] = 'finished'
|
|
elif has_active_attempt:
|
|
exam_data['status'] = 'in_progress'
|
|
else:
|
|
exam_data['status'] = 'available'
|
|
|
|
# Add score if finished
|
|
if exam_data['status'] == 'finished':
|
|
# Try to find the latest finished attempt and its score from any user
|
|
latest_score = None
|
|
latest_time = None
|
|
latest_attempt_id = None
|
|
latest_username = None
|
|
|
|
# Search all user directories for the latest finished attempt
|
|
attempts_base = storage.attempts_dir
|
|
if attempts_base.exists():
|
|
for user_dir in attempts_base.iterdir():
|
|
if user_dir.is_dir():
|
|
for attempt_file in user_dir.rglob('*.json'):
|
|
try:
|
|
attempt = storage._read_json(attempt_file)
|
|
if (attempt.get('examId') == exam_info['examId'] and
|
|
attempt.get('status') == 'finished'):
|
|
|
|
attempt_time = attempt.get('submittedAt', '')
|
|
if latest_time is None or attempt_time > latest_time:
|
|
latest_time = attempt_time
|
|
latest_attempt_id = attempt.get('attemptId')
|
|
latest_username = user_dir.name
|
|
except Exception:
|
|
continue
|
|
|
|
# Now look for the score in the output bundle
|
|
if latest_attempt_id and latest_username:
|
|
# Try to get username from user ID
|
|
username = latest_username
|
|
try:
|
|
if latest_username.isdigit():
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
user = User.objects.get(id=int(latest_username))
|
|
username = user.username
|
|
except:
|
|
pass
|
|
|
|
# Search in user's output folder for the bundle
|
|
user_output_dir = storage.output_dir / username
|
|
if user_output_dir.exists():
|
|
for output_file in user_output_dir.rglob(f"*_{latest_attempt_id}.json"):
|
|
try:
|
|
bundle = storage._read_json(output_file)
|
|
if bundle.get('attempt', {}).get('score'):
|
|
latest_score = bundle['attempt']['score']
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if latest_score:
|
|
exam_data['lastScore'] = latest_score
|
|
|
|
exams_with_details.append(exam_data)
|
|
|
|
# Organize exams by subject and status
|
|
organized_exams = {
|
|
'bySubject': {},
|
|
'byStatus': {
|
|
'available': [],
|
|
'finished': [],
|
|
'passed': [],
|
|
'notPassed': []
|
|
},
|
|
'all': exams_with_details
|
|
}
|
|
|
|
# Group by subject
|
|
for exam in exams_with_details:
|
|
subject = exam.get('subject', 'Other')
|
|
# Normalize subject names
|
|
subject_map = {
|
|
'python': 'Python',
|
|
'cpp': 'C++',
|
|
'linear_algebra': 'Linear Algebra',
|
|
'javascript': 'JavaScript',
|
|
'java': 'Java',
|
|
'sql': 'SQL'
|
|
}
|
|
normalized_subject = subject_map.get(subject.lower(), subject)
|
|
if normalized_subject not in organized_exams['bySubject']:
|
|
organized_exams['bySubject'][normalized_subject] = []
|
|
organized_exams['bySubject'][normalized_subject].append(exam)
|
|
|
|
# Group by status
|
|
for exam in exams_with_details:
|
|
status = exam.get('status', 'available')
|
|
if status in organized_exams['byStatus']:
|
|
organized_exams['byStatus'][status].append(exam)
|
|
|
|
# Also categorize finished exams by pass/fail
|
|
if status == 'finished' and exam.get('lastScore'):
|
|
if exam['lastScore'].get('passed', False):
|
|
organized_exams['byStatus']['passed'].append(exam)
|
|
else:
|
|
organized_exams['byStatus']['notPassed'].append(exam)
|
|
|
|
return Response({
|
|
'exams': exams_with_details,
|
|
'organized': organized_exams,
|
|
'subjects': list(organized_exams['bySubject'].keys()),
|
|
'summary': {
|
|
'total': len(exams_with_details),
|
|
'available': len(organized_exams['byStatus']['available']),
|
|
'finished': len(organized_exams['byStatus']['finished']),
|
|
'passed': len(organized_exams['byStatus']['passed']),
|
|
'notPassed': len(organized_exams['byStatus']['notPassed'])
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['GET'])
|
|
def get_exam(request, exam_id):
|
|
"""Get exam details by ID."""
|
|
try:
|
|
exam = storage.get_exam(exam_id)
|
|
if not exam:
|
|
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
user_id = get_user_id(request)
|
|
if storage.is_exam_finished(user_id, exam_id):
|
|
return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED)
|
|
|
|
return Response(exam)
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['POST'])
|
|
def start_or_resume_attempt(request, exam_id):
|
|
"""Start new attempt or resume existing one."""
|
|
try:
|
|
user_id = get_user_id(request)
|
|
|
|
# Check if exam exists
|
|
exam = storage.get_exam(exam_id)
|
|
if not exam:
|
|
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
# Check if already finished
|
|
if storage.is_exam_finished(user_id, exam_id):
|
|
return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED)
|
|
|
|
# Check for active attempt
|
|
attempt = storage.get_active_attempt(user_id, exam_id)
|
|
if attempt:
|
|
return Response(attempt)
|
|
|
|
# Create new attempt
|
|
attempt = storage.create_attempt(user_id, exam_id, exam)
|
|
return Response(attempt, status=status.HTTP_201_CREATED)
|
|
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['GET'])
|
|
def get_attempt(request, attempt_id):
|
|
"""Get attempt by ID."""
|
|
try:
|
|
user_id = get_user_id(request)
|
|
|
|
# Find attempt file using helper
|
|
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
|
|
|
|
if not attempt_path:
|
|
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
attempt = storage._read_json(attempt_path)
|
|
return Response(attempt)
|
|
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['PUT'])
|
|
def autosave_attempt(request, attempt_id):
|
|
"""Autosave attempt answers."""
|
|
try:
|
|
user_id = get_user_id(request)
|
|
data = request.data
|
|
|
|
# Find attempt file using helper
|
|
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
|
|
|
|
if not attempt_path:
|
|
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
# Load attempt
|
|
attempt = storage._read_json(attempt_path)
|
|
|
|
# Update answers
|
|
if 'answers' in data:
|
|
attempt['answers'] = data['answers']
|
|
|
|
# Save
|
|
storage.save_attempt(attempt)
|
|
|
|
# Update progress
|
|
exam = storage.get_exam(exam_id)
|
|
if exam:
|
|
total_questions = sum(len(section['questions']) for section in exam.get('sections', []))
|
|
answered = len(attempt['answers'])
|
|
percent = (answered / total_questions * 100) if total_questions > 0 else 0
|
|
storage.update_progress(user_id, exam_id, percent)
|
|
|
|
return Response({'updatedAt': attempt['updatedAt']})
|
|
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['POST'])
|
|
def submit_attempt(request, attempt_id):
|
|
"""Submit attempt and create output bundle."""
|
|
try:
|
|
user_id = get_user_id(request)
|
|
|
|
# Find attempt file using helper
|
|
attempt_path, exam_id = find_attempt_file(user_id, attempt_id)
|
|
|
|
if not attempt_path:
|
|
return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
attempt = storage._read_json(attempt_path)
|
|
|
|
if attempt['status'] != 'in_progress':
|
|
return Response({'error': 'Attempt already submitted'}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
# Load exam
|
|
exam = storage.get_exam(exam_id)
|
|
if not exam:
|
|
return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
# Calculate score if auto-scorable
|
|
score_result = None
|
|
if can_auto_score(exam):
|
|
score_result = calculate_score(exam, attempt)
|
|
# Add score to attempt
|
|
attempt['score'] = score_result
|
|
|
|
# Submit and create bundle
|
|
output_path = storage.submit_attempt(attempt, exam)
|
|
|
|
response_data = {
|
|
'message': 'Exam submitted successfully',
|
|
'outputPath': output_path,
|
|
'attemptId': attempt_id
|
|
}
|
|
|
|
# Include score in response if available
|
|
if score_result:
|
|
response_data['score'] = score_result
|
|
response_data['autoScored'] = True
|
|
else:
|
|
response_data['autoScored'] = False
|
|
response_data['message'] = 'Exam submitted successfully. Manual grading required for some questions.'
|
|
|
|
return Response(response_data)
|
|
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['GET'])
|
|
def get_progress(request):
|
|
"""Get current user's progress."""
|
|
try:
|
|
user_id = get_user_id(request)
|
|
progress = storage.get_progress(user_id)
|
|
return Response(progress)
|
|
except Exception as e:
|
|
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['GET'])
|
|
def health_check(request):
|
|
"""Health check endpoint."""
|
|
return Response({'status': 'ok', 'service': 'exam_server'})
|
|
|