Files
quiz/app/routes_student.py
ameer 3252ccb2ec fix(anti-hijack): validate cookie_id against DB on every authed read
Closes the post-recovery re-attack window. Previously cookies were
authenticated purely cryptographically — once a hijacker received a
signed cookie for student_id=X, that cookie remained valid forever
(until QUIZ_SECRET_KEY rotated), even after admin clear-student + legit
re-claim issued a fresh cookie_id for X.

Now /me, /event, and /ws/student all check that the cookie's cookie_id
matches participants.cookie_id for the (sid, student_id). Mismatch ->
401 + Set-Cookie clearing for HTTP, ws.close(4001) for WS. The legit
re-claim wins because admin clear_student deletes the row and the next
join inserts the new student's cookie_id; the hijacker's cookie now
fails the DB check on every subsequent request.

Test: tests/test_anti_cheat.py::test_post_recovery_old_cookie_is_dead
covers the full hijack -> clear -> re-claim -> hijacker-locked-out
sequence end to end.
2026-05-04 16:22:59 +08:00

200 lines
9.2 KiB
Python

"""Student routes (single-session deployment)."""
from __future__ import annotations
from pathlib import Path
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Request, Response, WebSocket
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse
from app import auth
from app.config import Settings
from app.models import JoinRequest, StudentEventRequest
from app.rate_limit import client_ip
from app.room import DuplicateStudentId, RoomManager
def router(settings: Settings, rooms: RoomManager) -> APIRouter:
api = APIRouter()
def resolve_sid(sid: str | None) -> str:
return sid if sid else (rooms.canonical_sid or settings.default_session_id)
@api.get("/")
async def student_entry(sid: str | None = None):
target_sid = resolve_sid(sid)
if not await rooms.session_exists(target_sid):
return HTMLResponse(
"<!doctype html><meta charset='utf-8'>"
"<link rel='stylesheet' href='/static/style.css'>"
"<title>Quiz unavailable</title>"
"<main class='centered-shell'><div class='card narrow'>"
"<h1>Ask your instructor for the link</h1>"
"<p class='muted'>This quiz link is missing or no longer valid.</p>"
"</div></main>",
status_code=404,
)
if not sid:
# Canonicalise the URL so QR codes, share links, and bookmarks
# all converge on the same sid-bearing form.
return RedirectResponse(url=f"/?sid={target_sid}", status_code=302)
return FileResponse(Path("static/student.html"))
@api.get("/api/session/{sid}")
async def session_metadata(sid: str):
if not await rooms.session_exists(sid):
raise HTTPException(status_code=404, detail="Session not found")
session = await rooms.get_session(sid)
return {
"sid": sid,
"title": session["title"],
"state": session["state"],
"current_question_idx": session["current_question_idx"],
"time_limit_default": (await rooms.get_pool_for_session(sid))["time_limit_default"],
}
@api.post("/api/session/{sid}/join")
async def join_session(sid: str, body: JoinRequest, request: Request, response: Response):
if not await rooms.session_exists(sid):
raise HTTPException(status_code=404, detail="Session not found")
student_id = body.student_id.strip()
name = body.name.strip()
cookie_id = str(uuid4())
try:
await rooms.add_participant(sid, student_id, name, cookie_id)
except DuplicateStudentId:
# First-claim-wins anti-hijack: a participant row already
# exists for this student_id. Could be a hijack attempt
# OR a legit student returning after clearing cookies. Log
# the attempt with IP/UA/attempted-name so the instructor
# can surface it on the live presence panel and decide.
await rooms.log_event(
sid,
student_id=student_id,
kind="duplicate_join",
detail={
"attempted_name": name,
"ip": client_ip(request),
"ua": (request.headers.get("user-agent") or "")[:200],
},
)
await rooms.broadcast_presence(sid)
raise HTTPException(
status_code=409,
detail=(
"This student ID is already in use. If this is your ID, "
"ask your instructor to clear it for you."
),
) from None
cookie_value = auth.sign_student(settings, sid, student_id, name, cookie_id)
auth.set_student_cookie(settings, response, cookie_value)
return {"ok": True, "cookie_id": cookie_id}
@api.post("/api/session/{sid}/event")
async def post_event(sid: str, body: StudentEventRequest, request: Request):
# Audit-only endpoint: the student page POSTs here on tab blur
# / visibility-hidden so the instructor can see engagement
# signals during a live question. No state change.
if not await rooms.session_exists(sid):
raise HTTPException(status_code=404, detail="Session not found")
identity = auth.get_student_identity(settings, request, sid)
if not identity:
raise HTTPException(status_code=401, detail="Student cookie required")
if not await rooms.cookie_id_matches(sid, identity["student_id"], identity["cookie_id"]):
# Same defence as /me: a stale post-recovery cookie should
# not be able to pollute the audit log.
raise HTTPException(status_code=401, detail="Re-join required")
await rooms.log_event(
sid,
student_id=identity["student_id"],
kind=body.kind,
question_idx=body.question_idx,
detail={"ip": client_ip(request)},
)
# blur / visibility_hidden are surfaced to the instructor; focus /
# visibility_visible are recorded for completeness but don't need
# an immediate broadcast.
if body.kind in {"blur", "visibility_hidden"}:
await rooms.broadcast_presence(sid)
return {"ok": True}
@api.get("/api/session/{sid}/me")
async def me(sid: str, request: Request):
identity = auth.get_student_identity(settings, request, sid)
if not identity:
raise HTTPException(status_code=401, detail="Student cookie required")
# Validate cookie_id against DB. Two cases this catches:
# (a) participant row is gone (session reset, admin clear, DB
# rebuild) → cookie_id_matches returns False → 401 + cleared.
# (b) participant row exists but with a different cookie_id (a
# prior hijacker's cookie still cryptographically valid
# after the legit student re-claimed via admin recovery)
# → 401 + cleared. The hijacker's stale cookie is now dead.
if not await rooms.cookie_id_matches(sid, identity["student_id"], identity["cookie_id"]):
resp = JSONResponse({"detail": "Re-join required"}, status_code=401)
resp.delete_cookie(auth.STUDENT_COOKIE, path="/")
return resp
return await rooms.me(sid, identity["student_id"])
@api.get("/api/session/{sid}/stats")
async def stats(sid: str, request: Request, question_idx: int | None = None):
if not await rooms.session_exists(sid):
raise HTTPException(status_code=404, detail="Session not found")
identity = auth.get_student_identity(settings, request, sid)
return await rooms.stats(sid, question_idx, identity["student_id"] if identity else None)
@api.websocket("/ws/student/{sid}")
async def student_socket(websocket: WebSocket, sid: str):
identity = auth.get_student_identity_ws(settings, websocket, sid)
if not identity or not await rooms.session_exists(sid):
await websocket.close(code=4001)
return
# cookie_id-vs-DB check closes the post-recovery re-attack window:
# a hijacker's WS won't authenticate after the legit student has
# re-claimed their id via admin clear-student.
if not await rooms.cookie_id_matches(sid, identity["student_id"], identity["cookie_id"]):
await websocket.close(code=4001)
return
await rooms.student_ws(websocket, sid, identity)
# ---- Projector view (public, read-only) -------------------------------
# The projector page runs at the front of the room on a smart TV / big
# screen. No auth: it shows only aggregate / leaderboard data that
# would already be visible on the student's own screen at reveal
# time. Per-student histograms keep names but redact student_ids
# (the student-id namespace is private).
@api.get("/projector/")
async def projector_page(sid: str | None = None):
target_sid = resolve_sid(sid)
if not await rooms.session_exists(target_sid):
return HTMLResponse(
"<!doctype html><meta charset='utf-8'>"
"<link rel='stylesheet' href='/static/style.css'>"
"<title>Projector — quiz unavailable</title>"
"<main class='centered-shell'><div class='card narrow'>"
"<h1>Projector — no live session</h1>"
"<p class='muted'>Start the quiz from the admin dashboard.</p>"
"</div></main>",
status_code=404,
)
if not sid:
return RedirectResponse(url=f"/projector/?sid={target_sid}", status_code=302)
return FileResponse(Path("static/projector.html"))
@api.get("/api/session/{sid}/projector")
async def projector_state(sid: str):
if not await rooms.session_exists(sid):
raise HTTPException(status_code=404, detail="Session not found")
return await rooms.projector_snapshot(sid)
@api.websocket("/ws/projector/{sid}")
async def projector_socket(websocket: WebSocket, sid: str):
if not await rooms.session_exists(sid):
await websocket.close(code=4001)
return
await rooms.projector_ws(websocket, sid)
return api