""" API views for exam system. """ from pathlib import Path from rest_framework.decorators import api_view from rest_framework.response import Response from rest_framework import status from .storage import storage from .scoring import can_auto_score, calculate_score def get_user_id(request): """Get user ID from session or authenticated user.""" if request.user.is_authenticated: return str(request.user.id) # Fallback for unauthenticated sessions (backward compatibility) if 'user_id' not in request.session: request.session['user_id'] = f"guest_{request.session.session_key or 'default'}" return request.session['user_id'] def find_attempt_file(user_id, attempt_id): """Recursively search for attempt file in user's folder.""" user_attempts_base = storage.attempts_dir / user_id if not user_attempts_base.exists(): return None, None def search_recursive(base_path): for item in base_path.iterdir(): if item.is_file() and item.name == f"{attempt_id}.json": # Get exam_id from parent folder exam_id = item.parent.name return item, exam_id elif item.is_dir(): found_path, found_exam = search_recursive(item) if found_path: return found_path, found_exam return None, None return search_recursive(user_attempts_base) @api_view(['GET']) def list_exams(request): """List all published exams with full details.""" try: exams = storage.list_exams(published_only=True) user_id = get_user_id(request) # Add status and full details to each exam exams_with_details = [] for exam_info in exams: # Load full exam JSON full_exam = storage.get_exam(exam_info['examId']) if not full_exam: continue # Merge manifest info with full exam exam_data = { **full_exam, # Full exam JSON (includes subject, difficulty, etc.) 'published': exam_info.get('published', True), 'version': exam_info.get('version', '1.0.0') } # Add status - check actual attempt files for more reliable status # For now, check if there are any finished attempts for this exam (any user) # This is a temporary solution until proper authentication is set up has_finished_attempt = False has_active_attempt = False # Check all user directories for attempts attempts_base = storage.attempts_dir if attempts_base.exists(): for user_dir in attempts_base.iterdir(): if user_dir.is_dir(): for attempt_file in user_dir.rglob('*.json'): try: attempt = storage._read_json(attempt_file) if attempt.get('examId') == exam_info['examId']: if attempt.get('status') == 'finished': has_finished_attempt = True elif attempt.get('status') in ['in_progress', 'draft']: has_active_attempt = True except Exception: continue # Determine status if has_finished_attempt: exam_data['status'] = 'finished' elif has_active_attempt: exam_data['status'] = 'in_progress' else: exam_data['status'] = 'available' # Add score if finished if exam_data['status'] == 'finished': # Try to find the latest finished attempt and its score from any user latest_score = None latest_time = None latest_attempt_id = None latest_username = None # Search all user directories for the latest finished attempt attempts_base = storage.attempts_dir if attempts_base.exists(): for user_dir in attempts_base.iterdir(): if user_dir.is_dir(): for attempt_file in user_dir.rglob('*.json'): try: attempt = storage._read_json(attempt_file) if (attempt.get('examId') == exam_info['examId'] and attempt.get('status') == 'finished'): attempt_time = attempt.get('submittedAt', '') if latest_time is None or attempt_time > latest_time: latest_time = attempt_time latest_attempt_id = attempt.get('attemptId') latest_username = user_dir.name except Exception: continue # Now look for the score in the output bundle if latest_attempt_id and latest_username: # Try to get username from user ID username = latest_username try: if latest_username.isdigit(): from django.contrib.auth import get_user_model User = get_user_model() user = User.objects.get(id=int(latest_username)) username = user.username except: pass # Search in user's output folder for the bundle user_output_dir = storage.output_dir / username if user_output_dir.exists(): for output_file in user_output_dir.rglob(f"*_{latest_attempt_id}.json"): try: bundle = storage._read_json(output_file) if bundle.get('attempt', {}).get('score'): latest_score = bundle['attempt']['score'] break except Exception: continue if latest_score: exam_data['lastScore'] = latest_score exams_with_details.append(exam_data) # Organize exams by subject and status organized_exams = { 'bySubject': {}, 'byStatus': { 'available': [], 'finished': [], 'passed': [], 'notPassed': [] }, 'all': exams_with_details } # Group by subject for exam in exams_with_details: subject = exam.get('subject', 'Other') # Normalize subject names subject_map = { 'python': 'Python', 'cpp': 'C++', 'linear_algebra': 'Linear Algebra', 'javascript': 'JavaScript', 'java': 'Java', 'sql': 'SQL' } normalized_subject = subject_map.get(subject.lower(), subject) if normalized_subject not in organized_exams['bySubject']: organized_exams['bySubject'][normalized_subject] = [] organized_exams['bySubject'][normalized_subject].append(exam) # Group by status for exam in exams_with_details: status = exam.get('status', 'available') if status in organized_exams['byStatus']: organized_exams['byStatus'][status].append(exam) # Also categorize finished exams by pass/fail if status == 'finished' and exam.get('lastScore'): if exam['lastScore'].get('passed', False): organized_exams['byStatus']['passed'].append(exam) else: organized_exams['byStatus']['notPassed'].append(exam) return Response({ 'exams': exams_with_details, 'organized': organized_exams, 'subjects': list(organized_exams['bySubject'].keys()), 'summary': { 'total': len(exams_with_details), 'available': len(organized_exams['byStatus']['available']), 'finished': len(organized_exams['byStatus']['finished']), 'passed': len(organized_exams['byStatus']['passed']), 'notPassed': len(organized_exams['byStatus']['notPassed']) } }) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) def get_exam(request, exam_id): """Get exam details by ID.""" try: exam = storage.get_exam(exam_id) if not exam: return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND) user_id = get_user_id(request) if storage.is_exam_finished(user_id, exam_id): return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED) return Response(exam) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) def start_or_resume_attempt(request, exam_id): """Start new attempt or resume existing one.""" try: user_id = get_user_id(request) # Check if exam exists exam = storage.get_exam(exam_id) if not exam: return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND) # Check if already finished if storage.is_exam_finished(user_id, exam_id): return Response({'error': 'Exam already finished'}, status=status.HTTP_423_LOCKED) # Check for active attempt attempt = storage.get_active_attempt(user_id, exam_id) if attempt: return Response(attempt) # Create new attempt attempt = storage.create_attempt(user_id, exam_id, exam) return Response(attempt, status=status.HTTP_201_CREATED) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) def get_attempt(request, attempt_id): """Get attempt by ID.""" try: user_id = get_user_id(request) # Find attempt file using helper attempt_path, exam_id = find_attempt_file(user_id, attempt_id) if not attempt_path: return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND) attempt = storage._read_json(attempt_path) return Response(attempt) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['PUT']) def autosave_attempt(request, attempt_id): """Autosave attempt answers.""" try: user_id = get_user_id(request) data = request.data # Find attempt file using helper attempt_path, exam_id = find_attempt_file(user_id, attempt_id) if not attempt_path: return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND) # Load attempt attempt = storage._read_json(attempt_path) # Update answers if 'answers' in data: attempt['answers'] = data['answers'] # Save storage.save_attempt(attempt) # Update progress exam = storage.get_exam(exam_id) if exam: total_questions = sum(len(section['questions']) for section in exam.get('sections', [])) answered = len(attempt['answers']) percent = (answered / total_questions * 100) if total_questions > 0 else 0 storage.update_progress(user_id, exam_id, percent) return Response({'updatedAt': attempt['updatedAt']}) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) def submit_attempt(request, attempt_id): """Submit attempt and create output bundle.""" try: user_id = get_user_id(request) # Find attempt file using helper attempt_path, exam_id = find_attempt_file(user_id, attempt_id) if not attempt_path: return Response({'error': 'Attempt not found'}, status=status.HTTP_404_NOT_FOUND) attempt = storage._read_json(attempt_path) if attempt['status'] != 'in_progress': return Response({'error': 'Attempt already submitted'}, status=status.HTTP_400_BAD_REQUEST) # Load exam exam = storage.get_exam(exam_id) if not exam: return Response({'error': 'Exam not found'}, status=status.HTTP_404_NOT_FOUND) # Calculate score if auto-scorable score_result = None if can_auto_score(exam): score_result = calculate_score(exam, attempt) # Add score to attempt attempt['score'] = score_result # Submit and create bundle output_path = storage.submit_attempt(attempt, exam) response_data = { 'message': 'Exam submitted successfully', 'outputPath': output_path, 'attemptId': attempt_id } # Include score in response if available if score_result: response_data['score'] = score_result response_data['autoScored'] = True else: response_data['autoScored'] = False response_data['message'] = 'Exam submitted successfully. Manual grading required for some questions.' return Response(response_data) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) def get_progress(request): """Get current user's progress.""" try: user_id = get_user_id(request) progress = storage.get_progress(user_id) return Response(progress) except Exception as e: return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET']) def health_check(request): """Health check endpoint.""" return Response({'status': 'ok', 'service': 'exam_server'})