Files
quiz/app/room.py
ameer 3252ccb2ec fix(anti-hijack): validate cookie_id against DB on every authed read
Closes the post-recovery re-attack window. Previously cookies were
authenticated purely cryptographically — once a hijacker received a
signed cookie for student_id=X, that cookie remained valid forever
(until QUIZ_SECRET_KEY rotated), even after admin clear-student + legit
re-claim issued a fresh cookie_id for X.

Now /me, /event, and /ws/student all check that the cookie's cookie_id
matches participants.cookie_id for the (sid, student_id). Mismatch ->
401 + Set-Cookie clearing for HTTP, ws.close(4001) for WS. The legit
re-claim wins because admin clear_student deletes the row and the next
join inserts the new student's cookie_id; the hijacker's cookie now
fails the DB check on every subsequent request.

Test: tests/test_anti_cheat.py::test_post_recovery_old_cookie_is_dead
covers the full hijack -> clear -> re-claim -> hijacker-locked-out
sequence end to end.
2026-05-04 16:22:59 +08:00

1208 lines
57 KiB
Python

"""In-process WebSocket room manager."""
from __future__ import annotations
import asyncio
import base64
import json
from collections import defaultdict
from datetime import UTC, datetime
from io import BytesIO
from typing import Any
import aiosqlite
import qrcode
import qrcode.image.svg
from fastapi import WebSocket, WebSocketDisconnect
from app.config import Settings
from app.db import connect
from app.pool import get_question, parse_pool_json, public_question_payload, question_count, question_time_limit
from app.scoring import SCORE_FNS
def now_utc() -> datetime:
return datetime.now(UTC)
def now_ms() -> int:
return int(now_utc().timestamp() * 1000)
def iso_now() -> str:
return now_utc().isoformat()
def parse_ts(value: str) -> datetime:
if value.endswith("Z"):
value = value[:-1] + "+00:00"
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed
def _qr_data_url(value: str) -> str:
image = qrcode.make(value, image_factory=qrcode.image.svg.SvgPathImage)
buf = BytesIO()
image.save(buf)
encoded = base64.b64encode(buf.getvalue()).decode("ascii")
return f"data:image/svg+xml;base64,{encoded}"
class DuplicateStudentId(Exception):
"""Raised when a join request targets a student_id that is already
claimed by another active participant (first-claim-wins anti-hijack)."""
class RoomManager:
def __init__(self, settings: Settings):
self.settings = settings
self.student_clients: dict[str, dict[WebSocket, dict[str, Any]]] = defaultdict(dict)
self.instructor_clients: dict[str, set[WebSocket]] = defaultdict(set)
# Projector clients are public read-only; no per-client identity.
self.projector_clients: dict[str, set[WebSocket]] = defaultdict(set)
self.autoclose_tasks: dict[tuple[str, int], asyncio.Task] = {}
self.locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# The single canonical session id, set during startup once the pool
# has been loaded. Routes use this rather than settings.default_session_id
# so that a session_id field in the pool JSON can override the env default.
self.canonical_sid: str | None = None
# Volatile presence: presence[sid][student_id] = {"connected": bool,
# "last_seen_ms": int, "ws_count": int}. Rebuilt on each WS connect
# / disconnect; not persisted (presence dies with the process).
self.presence: dict[str, dict[str, dict[str, Any]]] = defaultdict(dict)
async def ensure_single_session(self, sid: str, pool: dict[str, Any]) -> None:
"""Idempotently upsert the canonical single-session row + its quiz row.
Called on startup with the operator-supplied pool JSON. Creates the
quiz + session if they don't exist, otherwise updates the pool blob
on the existing quiz so a fresh restart picks up edits to the pool
file without losing prior submissions for the same session.
"""
title = pool["title"]
pool_blob = json.dumps(pool)
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"SELECT quiz_id FROM quiz_sessions WHERE sid = ?",
(sid,),
)
row = await cursor.fetchone()
if row is None:
cursor = await db.execute(
"INSERT INTO quizzes (title, pool_json, time_limit_default, score_fn_name) VALUES (?, ?, ?, ?)",
(title, pool_blob, pool["time_limit_default"], pool["score_fn"]),
)
quiz_id = cursor.lastrowid
await db.execute(
"INSERT INTO quiz_sessions (sid, quiz_id, title) VALUES (?, ?, ?)",
(sid, quiz_id, title),
)
else:
quiz_id = row["quiz_id"]
await db.execute(
"UPDATE quizzes SET title = ?, pool_json = ?, time_limit_default = ?, score_fn_name = ? WHERE id = ?",
(title, pool_blob, pool["time_limit_default"], pool["score_fn"], quiz_id),
)
await db.execute(
"UPDATE quiz_sessions SET title = ? WHERE sid = ?",
(title, sid),
)
await db.commit()
async def advance_to_next(self, sid: str) -> None:
"""Instructor 'Next' button: a single button that drives the whole
lifecycle. From lobby it opens Q0; from a question_open state it
closes the current Q and opens the next; from question_closed it
opens the next Q. If there is no next question, the session ends.
"""
async with self.locks[sid]:
session = await self.get_session(sid)
if session["state"] == "finished":
return
current_idx = session["current_question_idx"]
close_current = session["state"] == "question_open"
if close_current:
await self._close_question_locked(sid, int(current_idx))
if close_current:
await self.broadcast_question_closed(sid, int(current_idx))
pool = await self.get_pool_for_session(sid)
total = question_count(pool)
next_idx = 0 if current_idx is None else int(current_idx) + 1
if next_idx >= total:
await self.end_session(sid)
return
await self.open_question(sid, next_idx)
async def reset(self, sid: str) -> None:
"""Wipe submissions, participants, and per-question state, then return
the session to lobby. Useful for re-running the same quiz across
classes without redeploying."""
async with self.locks[sid]:
task_keys = [key for key in self.autoclose_tasks if key[0] == sid]
for key in task_keys:
task = self.autoclose_tasks.pop(key, None)
if task:
task.cancel()
async with connect(self.settings.db_path) as db:
await db.execute("DELETE FROM submissions WHERE sid = ?", (sid,))
await db.execute("DELETE FROM question_events WHERE sid = ?", (sid,))
await db.execute("DELETE FROM participants WHERE sid = ?", (sid,))
await db.execute("DELETE FROM student_events WHERE sid = ?", (sid,))
await db.execute(
"UPDATE quiz_sessions SET state = 'lobby', current_question_idx = NULL, finished_at = NULL WHERE sid = ?",
(sid,),
)
await db.commit()
# Tell each student client the session was reset BEFORE closing the
# socket, so the JS can clear local state and re-bootstrap into the
# join form rather than showing a generic "disconnected" screen.
for ws in list(self.student_clients.get(sid, {}).keys()):
try:
await ws.send_json({"type": "session_reset"})
except Exception:
pass
try:
await ws.close(code=4002)
except Exception:
pass
self.student_clients.pop(sid, None)
# Presence is volatile — wipe alongside the participant table so
# the next instructor snapshot doesn't show stale ghost rows.
self.presence.pop(sid, None)
await self.broadcast_instructors(sid, {"type": "state", "state": "lobby", "current_question_idx": None, "title": (await self.get_session(sid))["title"]})
await self.broadcast_lobby(sid)
await self.broadcast_presence(sid)
await self.broadcast_projectors(sid)
async def sessions_active(self) -> int:
async with connect(self.settings.db_path) as db:
cursor = await db.execute("SELECT COUNT(*) AS count FROM quiz_sessions WHERE state != 'finished'")
row = await cursor.fetchone()
return int(row["count"])
def ws_client_count(self) -> int:
return (
sum(len(clients) for clients in self.student_clients.values())
+ sum(len(clients) for clients in self.instructor_clients.values())
+ sum(len(clients) for clients in self.projector_clients.values())
)
async def cookie_id_matches(self, sid: str, student_id: str, cookie_id: str) -> bool:
"""Check the student's signed cookie_id against the DB participant
row. Used to defend against the post-recovery re-attack: after
admin clears a hijacked id and the legitimate student re-joins
with a fresh cookie_id, the original hijacker's cookie is still
cryptographically valid (the secret key is unchanged), but the
DB cookie_id now belongs to the legit student. We reject any
request whose cookie_id doesn't match the current row."""
async with connect(self.settings.db_path) as db:
cur = await db.execute(
"SELECT cookie_id FROM participants WHERE sid = ? AND student_id = ?",
(sid, student_id),
)
row = await cur.fetchone()
return row is not None and row["cookie_id"] == cookie_id
async def add_participant(self, sid: str, student_id: str, name: str, cookie_id: str) -> None:
"""First-claim-wins. Raises DuplicateStudentId if this student_id
is already in the participants table for this sid (an attempt to
hijack another student's id, or a legit student returning after
clearing cookies). The route handler turns the exception into a
409 + records a `duplicate_join` audit event so the instructor
can see the attempt on the live presence panel."""
async with connect(self.settings.db_path) as db:
try:
await db.execute(
"INSERT INTO participants (sid, student_id, name, cookie_id) VALUES (?, ?, ?, ?)",
(sid, student_id, name, cookie_id),
)
except aiosqlite.IntegrityError as exc:
# PK violation = student_id already claimed in this sid.
raise DuplicateStudentId(student_id) from exc
await db.execute(
"""
INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score)
SELECT sid, ?, question_idx, 'missed', 0
FROM question_events
WHERE sid = ? AND closed_at IS NOT NULL
""",
(student_id, sid),
)
await db.commit()
await self.broadcast_lobby(sid)
await self.broadcast_presence(sid)
await self.broadcast_projectors(sid)
async def log_event(
self,
sid: str,
student_id: str | None,
kind: str,
question_idx: int | None = None,
detail: dict[str, Any] | None = None,
) -> None:
async with connect(self.settings.db_path) as db:
await db.execute(
"""
INSERT INTO student_events (sid, student_id, question_idx, kind, detail)
VALUES (?, ?, ?, ?, ?)
""",
(sid, student_id, question_idx, kind, json.dumps(detail) if detail else None),
)
await db.commit()
async def clear_student(self, sid: str, student_id: str) -> bool:
"""Admin recovery hatch for first-claim-wins: remove a participant
+ all their submissions so the legitimate student can re-claim
their id. Returns True if a row was removed."""
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"DELETE FROM submissions WHERE sid = ? AND student_id = ?",
(sid, student_id),
)
cursor = await db.execute(
"DELETE FROM participants WHERE sid = ? AND student_id = ?",
(sid, student_id),
)
removed = cursor.rowcount > 0
await db.commit()
if removed:
self.presence.get(sid, {}).pop(student_id, None)
# Kick any active WS for this student_id so a stale cookie can
# no longer drive submissions. /me will 401 (cookie cleared)
# and the page will land on the join form.
for ws, ident in list(self.student_clients.get(sid, {}).items()):
if ident.get("student_id") == student_id:
try:
await ws.send_json({"type": "session_reset"})
except Exception:
pass
try:
await ws.close(code=4002)
except Exception:
pass
await self.broadcast_lobby(sid)
await self.broadcast_presence(sid)
await self.broadcast_projectors(sid)
return removed
async def student_ws(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None:
await websocket.accept()
self.student_clients[sid][websocket] = identity
student_id = identity["student_id"]
slot = self.presence[sid].setdefault(
student_id,
{"connected": False, "last_seen_ms": now_ms(), "ws_count": 0, "name": identity.get("name", "")},
)
slot["ws_count"] += 1
slot["connected"] = True
slot["last_seen_ms"] = now_ms()
slot["name"] = identity.get("name", slot.get("name", ""))
await self.broadcast_presence(sid)
try:
await self.send_student_snapshot(websocket, sid, identity)
while True:
try:
data = await websocket.receive_json()
except json.JSONDecodeError:
try:
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Invalid JSON"})
except (WebSocketDisconnect, RuntimeError):
break
continue
if not isinstance(data, dict):
try:
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Message must be a JSON object"})
except (WebSocketDisconnect, RuntimeError):
break
continue
msg_type = data.get("type")
if msg_type == "ping":
await websocket.send_json({"type": "pong"})
elif msg_type == "submit":
ack = await self.submit_answer(sid, identity["student_id"], data.get("question_idx"), data.get("answer"))
await websocket.send_json(ack)
else:
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"})
except (WebSocketDisconnect, RuntimeError):
pass
finally:
self.student_clients[sid].pop(websocket, None)
slot = self.presence.get(sid, {}).get(student_id)
if slot:
slot["ws_count"] = max(0, slot.get("ws_count", 1) - 1)
slot["connected"] = slot["ws_count"] > 0
slot["last_seen_ms"] = now_ms()
await self.broadcast_presence(sid)
async def instructor_ws(self, websocket: WebSocket, sid: str) -> None:
await websocket.accept()
self.instructor_clients[sid].add(websocket)
try:
await self.send_instructor_snapshot(websocket, sid)
while True:
try:
data = await websocket.receive_json()
except json.JSONDecodeError:
try:
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Invalid JSON"})
except (WebSocketDisconnect, RuntimeError):
break
continue
if not isinstance(data, dict):
try:
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Message must be a JSON object"})
except (WebSocketDisconnect, RuntimeError):
break
continue
msg_type = data.get("type")
if msg_type == "ping":
await websocket.send_json({"type": "pong"})
elif msg_type == "open_question":
await self.open_question(sid, int(data.get("question_idx", 0)), data.get("time_limit"))
elif msg_type == "close_question":
await self.close_question(sid)
elif msg_type == "next":
await self.advance_to_next(sid)
elif msg_type == "end_session":
await self.end_session(sid)
elif msg_type == "reset":
await self.reset(sid)
else:
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"})
except (WebSocketDisconnect, RuntimeError):
pass
finally:
self.instructor_clients[sid].discard(websocket)
async def send_student_snapshot(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None:
session = await self.get_session(sid)
await websocket.send_json(
{
"type": "state",
"state": session["state"],
"current_question_idx": session["current_question_idx"],
"title": session["title"],
}
)
if session["state"] == "question_open":
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
ack = await self.existing_submit_ack(sid, identity["student_id"], session["current_question_idx"])
if ack:
await websocket.send_json(ack)
elif session["state"] == "question_closed":
# Replay the reveal so a student joining mid-reveal sees the
# closed-question card with their answer / correct option /
# leaderboard, instead of being stuck on the join form's
# disabled state waiting for an event that never arrives.
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
await websocket.send_json(
await self.question_closed_message(sid, session["current_question_idx"], identity)
)
elif session["state"] == "finished":
await websocket.send_json(await self.ended_message(sid, identity))
async def send_instructor_snapshot(self, websocket: WebSocket, sid: str) -> None:
session = await self.get_session(sid)
await websocket.send_json(
{
"type": "state",
"state": session["state"],
"current_question_idx": session["current_question_idx"],
"title": session["title"],
}
)
await websocket.send_json(await self.lobby_message(sid))
await websocket.send_json(await self.presence_message(sid))
# When an instructor reconnects mid-session, replay enough payloads
# for the SPA to render the current state without waiting for the
# next event. Otherwise the dashboard sits on a "Reveal pending..."
# placeholder forever.
if session["state"] == "question_open":
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
await websocket.send_json(await self.live_histogram_message(sid, session["current_question_idx"]))
elif session["state"] == "question_closed":
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
await websocket.send_json(await self.question_closed_message(sid, session["current_question_idx"]))
await websocket.send_json(await self.full_leaderboard_message(sid))
elif session["state"] in {"between_questions", "finished"}:
await websocket.send_json(await self.full_leaderboard_message(sid))
async def open_question(self, sid: str, question_idx: int, time_limit: int | None = None) -> None:
async with self.locks[sid]:
session = await self.get_session(sid)
if session["state"] == "question_open" and session["current_question_idx"] == question_idx:
return
if session["state"] == "question_open":
await self._close_question_locked(sid, session["current_question_idx"])
pool = await self.get_pool_for_session(sid)
if question_idx < 0 or question_idx >= question_count(pool):
await self.broadcast_instructors(sid, {"type": "error", "code": "bad_question", "message": "Question index out of range"})
return
limit = int(time_limit or question_time_limit(pool, question_idx))
opened = iso_now()
async with connect(self.settings.db_path) as db:
await db.execute(
"""
INSERT INTO question_events (sid, question_idx, opened_at, time_limit)
VALUES (?, ?, ?, ?)
ON CONFLICT(sid, question_idx) DO UPDATE SET opened_at = excluded.opened_at,
closed_at = NULL, time_limit = excluded.time_limit
""",
(sid, question_idx, opened, limit),
)
await db.execute(
"UPDATE quiz_sessions SET state = 'question_open', current_question_idx = ? WHERE sid = ?",
(question_idx, sid),
)
await db.commit()
self._schedule_autoclose(sid, question_idx, limit)
msg = await self.question_open_message(sid, question_idx)
await self.broadcast_students(sid, msg)
await self.broadcast_instructors(sid, msg)
await self.broadcast_instructors(sid, await self.live_histogram_message(sid, question_idx))
await self.broadcast_projectors(sid)
async def close_question(self, sid: str) -> None:
async with self.locks[sid]:
session = await self.get_session(sid)
if session["state"] != "question_open":
return
question_idx = session["current_question_idx"]
await self._close_question_locked(sid, question_idx)
await self.broadcast_question_closed(sid, question_idx)
async def _close_question_locked(self, sid: str, question_idx: int) -> None:
task = self.autoclose_tasks.pop((sid, question_idx), None)
current = asyncio.current_task()
if task and task is not current:
task.cancel()
async with connect(self.settings.db_path) as db:
await db.execute(
"UPDATE question_events SET closed_at = COALESCE(closed_at, ?) WHERE sid = ? AND question_idx = ?",
(iso_now(), sid, question_idx),
)
await db.execute(
"""
INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score)
SELECT p.sid, p.student_id, ?, 'missed', 0
FROM participants p
WHERE p.sid = ?
""",
(question_idx, sid),
)
await db.execute(
"UPDATE quiz_sessions SET state = 'question_closed', current_question_idx = ? WHERE sid = ?",
(question_idx, sid),
)
await db.commit()
async def next_question(self, sid: str) -> None:
async with self.locks[sid]:
session = await self.get_session(sid)
if session["state"] != "question_closed":
return
next_idx = int(session["current_question_idx"]) + 1
async with connect(self.settings.db_path) as db:
await db.execute("UPDATE quiz_sessions SET state = 'between_questions' WHERE sid = ?", (sid,))
await db.commit()
await self.broadcast_between_questions(sid, next_idx)
async def end_session(self, sid: str) -> None:
async with self.locks[sid]:
session = await self.get_session(sid)
if session["state"] == "question_open":
await self._close_question_locked(sid, session["current_question_idx"])
async with connect(self.settings.db_path) as db:
await db.execute(
"UPDATE quiz_sessions SET state = 'finished', current_question_idx = NULL, finished_at = ? WHERE sid = ?",
(iso_now(), sid),
)
await db.commit()
await self.broadcast_session_ended(sid)
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
async def submit_answer(self, sid: str, student_id: str, question_idx: Any, answer: Any) -> dict[str, Any]:
try:
qidx = int(question_idx)
except (TypeError, ValueError):
return {"type": "error", "code": "bad_question", "message": "Invalid question index"}
if not isinstance(answer, str) or answer not in {"A", "B", "C", "D"}:
return {"type": "error", "code": "bad_answer", "message": "Answer must be A, B, C, or D"}
async with self.locks[sid]:
session = await self.get_session(sid)
if session["state"] != "question_open" or session["current_question_idx"] != qidx:
return {"type": "error", "code": "not_open", "message": "Question is not open"}
existing = await self.existing_submit_ack(sid, student_id, qidx)
if existing:
return existing
event = await self.get_question_event(sid, qidx)
opened_at = parse_ts(event["opened_at"])
elapsed_ms = max(0, int((now_utc() - opened_at).total_seconds() * 1000))
time_limit_ms = int(event["time_limit"]) * 1000
if elapsed_ms > time_limit_ms:
return {"type": "error", "code": "time_expired", "message": "Question time has expired"}
pool = await self.get_pool_for_session(sid)
question = get_question(pool, qidx)
correct = answer == question["correct"]
score_fn = SCORE_FNS[pool["score_fn"]]
score = score_fn(correct, elapsed_ms, time_limit_ms)
submitted_at = iso_now()
async with connect(self.settings.db_path) as db:
await db.execute(
"""
INSERT INTO submissions (sid, student_id, question_idx, answer, submitted_at, elapsed_ms, score, status)
VALUES (?, ?, ?, ?, ?, ?, ?, 'submitted')
ON CONFLICT(sid, student_id, question_idx) DO NOTHING
""",
(sid, student_id, qidx, answer, submitted_at, elapsed_ms, score),
)
await db.commit()
await self.broadcast_instructors(sid, await self.live_histogram_message(sid, qidx))
await self.broadcast_presence(sid)
await self.broadcast_projectors(sid)
return {"type": "submit_ack", "question_idx": qidx, "answer": answer, "score": score, "elapsed_ms": elapsed_ms}
def _schedule_autoclose(self, sid: str, question_idx: int, time_limit: int) -> None:
previous = self.autoclose_tasks.pop((sid, question_idx), None)
if previous:
previous.cancel()
self.autoclose_tasks[(sid, question_idx)] = asyncio.create_task(self._autoclose_after(sid, question_idx, time_limit))
async def _autoclose_after(self, sid: str, question_idx: int, time_limit: int) -> None:
try:
await asyncio.sleep(time_limit)
async with self.locks[sid]:
session = await self.get_session(sid)
if session["state"] == "question_open" and session["current_question_idx"] == question_idx:
await self._close_question_locked(sid, question_idx)
else:
return
await self.broadcast_question_closed(sid, question_idx)
except asyncio.CancelledError:
return
async def get_session(self, sid: str) -> dict[str, Any]:
async with connect(self.settings.db_path) as db:
cursor = await db.execute("SELECT * FROM quiz_sessions WHERE sid = ?", (sid,))
row = await cursor.fetchone()
if row is None:
raise KeyError(f"Unknown session {sid}")
return dict(row)
async def session_exists(self, sid: str) -> bool:
async with connect(self.settings.db_path) as db:
cursor = await db.execute("SELECT 1 FROM quiz_sessions WHERE sid = ?", (sid,))
row = await cursor.fetchone()
return row is not None
async def get_pool_for_session(self, sid: str) -> dict[str, Any]:
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"""
SELECT q.pool_json
FROM quizzes q
JOIN quiz_sessions s ON s.quiz_id = q.id
WHERE s.sid = ?
""",
(sid,),
)
row = await cursor.fetchone()
if row is None:
raise KeyError(f"Unknown session {sid}")
return parse_pool_json(row["pool_json"])
async def get_question_event(self, sid: str, question_idx: int) -> dict[str, Any]:
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"SELECT * FROM question_events WHERE sid = ? AND question_idx = ?",
(sid, question_idx),
)
row = await cursor.fetchone()
if row is None:
raise KeyError("Question has not been opened")
return dict(row)
async def existing_submit_ack(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None:
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"""
SELECT answer, elapsed_ms, score, status
FROM submissions
WHERE sid = ? AND student_id = ? AND question_idx = ? AND status = 'submitted'
""",
(sid, student_id, question_idx),
)
row = await cursor.fetchone()
if row is None:
return None
return {
"type": "submit_ack",
"question_idx": question_idx,
"answer": row["answer"],
"score": row["score"],
"elapsed_ms": row["elapsed_ms"],
}
async def question_open_message(self, sid: str, question_idx: int) -> dict[str, Any]:
pool = await self.get_pool_for_session(sid)
event = await self.get_question_event(sid, question_idx)
opened_ms = int(parse_ts(event["opened_at"]).timestamp() * 1000)
remaining_ms = max(0, opened_ms + int(event["time_limit"]) * 1000 - now_ms())
msg = {"type": "question_open", **public_question_payload(pool, question_idx)}
msg["opened_at_server_ts"] = opened_ms
msg["remaining_ms"] = remaining_ms
return msg
async def question_closed_message(self, sid: str, question_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]:
pool = await self.get_pool_for_session(sid)
question = get_question(pool, question_idx)
you_id = identity["student_id"] if identity else None
msg = {
"type": "question_closed",
"question_idx": question_idx,
"correct": question["correct"],
"explanation": question.get("explanation", ""),
"histogram": await self.histogram(sid, question_idx),
"top5": await self.leaderboard(sid, limit=5, you_student_id=you_id),
}
if identity:
student = identity["student_id"]
submission = await self.submission_for(sid, student, question_idx)
rank = await self.rank_for(sid, student)
total = await self.total_for(sid, student)
msg.update(
{
"your_answer": submission.get("answer") if submission else None,
"your_score": submission.get("score", 0) if submission else 0,
"your_rank": rank,
"your_total": total,
}
)
return msg
async def between_message(self, sid: str, next_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]:
you_id = identity["student_id"] if identity else None
msg = {"type": "between_questions", "next_idx": next_idx, "top5": await self.leaderboard(sid, limit=5, you_student_id=you_id)}
if identity:
msg["your_rank"] = await self.rank_for(sid, identity["student_id"])
msg["your_total"] = await self.total_for(sid, identity["student_id"])
return msg
async def ended_message(self, sid: str, identity: dict[str, Any] | None = None) -> dict[str, Any]:
you_id = identity["student_id"] if identity else None
msg = {"type": "session_ended", "final_top5": await self.leaderboard(sid, limit=5, you_student_id=you_id)}
if identity:
student = identity["student_id"]
msg.update(await self.student_summary(sid, student))
msg["your_rank"] = await self.rank_for(sid, student)
msg["your_total"] = await self.total_for(sid, student)
return msg
async def histogram(self, sid: str, question_idx: int, pending: bool = False) -> dict[str, int]:
result = {"A": 0, "B": 0, "C": 0, "D": 0, "missed": 0}
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"SELECT answer, status, COUNT(*) AS count FROM submissions WHERE sid = ? AND question_idx = ? GROUP BY answer, status",
(sid, question_idx),
)
rows = await cursor.fetchall()
total_cursor = await db.execute("SELECT COUNT(*) AS count FROM participants WHERE sid = ?", (sid,))
total_row = await total_cursor.fetchone()
submitted = 0
for row in rows:
if row["status"] == "missed":
result["missed"] += row["count"]
elif row["answer"] in result:
result[row["answer"]] += row["count"]
submitted += row["count"]
if pending:
result["pending"] = max(0, int(total_row["count"]) - submitted - result["missed"])
return result
async def leaderboard(
self,
sid: str,
limit: int | None = None,
include_ids: bool = False,
you_student_id: str | None = None,
) -> list[dict[str, Any]]:
"""Top scores. If `you_student_id` is given and that student appears
in the returned slice, that one entry is marked with `is_you: True`
so the client can highlight by id without exposing other students'
ids over the wire."""
query_limit = "" if limit is None else f"LIMIT {int(limit)}"
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
f"""
SELECT p.student_id, p.name, COALESCE(SUM(s.score), 0) AS score
FROM participants p
LEFT JOIN submissions s ON s.sid = p.sid AND s.student_id = p.student_id
WHERE p.sid = ?
GROUP BY p.student_id, p.name
ORDER BY score DESC, p.name ASC, p.student_id ASC
{query_limit}
""",
(sid,),
)
rows = await cursor.fetchall()
board = []
for rank, row in enumerate(rows, start=1):
item = {"rank": rank, "name": row["name"], "score": float(row["score"])}
if include_ids:
item["student_id"] = row["student_id"]
if you_student_id is not None and row["student_id"] == you_student_id:
item["is_you"] = True
board.append(item)
return board
async def rank_for(self, sid: str, student_id: str) -> int | None:
board = await self.leaderboard(sid, include_ids=True)
for item in board:
if item["student_id"] == student_id:
return item["rank"]
return None
async def total_for(self, sid: str, student_id: str) -> float:
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"SELECT COALESCE(SUM(score), 0) AS total FROM submissions WHERE sid = ? AND student_id = ?",
(sid, student_id),
)
row = await cursor.fetchone()
# Snap to two decimals so the sum stays display-friendly even after
# many small float additions; the per-question scores are already
# on a 0.05 grid, so this is mostly defensive.
return round(float(row["total"]), 2)
async def submission_for(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None:
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"SELECT * FROM submissions WHERE sid = ? AND student_id = ? AND question_idx = ?",
(sid, student_id, question_idx),
)
row = await cursor.fetchone()
return dict(row) if row else None
async def student_summary(self, sid: str, student_id: str) -> dict[str, int]:
pool = await self.get_pool_for_session(sid)
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"SELECT question_idx, answer, status FROM submissions WHERE sid = ? AND student_id = ?",
(sid, student_id),
)
rows = await cursor.fetchall()
answered = sum(1 for row in rows if row["status"] == "submitted")
correct = 0
for row in rows:
if row["status"] == "submitted" and row["answer"] == get_question(pool, row["question_idx"])["correct"]:
correct += 1
return {"questions_answered": answered, "questions_correct": correct}
async def lobby_message(self, sid: str) -> dict[str, Any]:
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"SELECT student_id, name, joined_at FROM participants WHERE sid = ? ORDER BY joined_at, name",
(sid,),
)
rows = await cursor.fetchall()
participants = [dict(row) for row in rows]
return {"type": "lobby_update", "participants": participants, "count": len(participants)}
async def presence_message(self, sid: str) -> dict[str, Any]:
"""Per-student live presence: connected/idle, last_seen, blur+
visibility-hidden counts, current-question-answered flag, and
any duplicate-join attempts on that id. Broadcast to the
instructor on every connect / disconnect / join / answer."""
async with connect(self.settings.db_path) as db:
participants_cur = await db.execute(
"SELECT student_id, name, joined_at FROM participants WHERE sid = ? ORDER BY joined_at, name",
(sid,),
)
participants = await participants_cur.fetchall()
session_cur = await db.execute(
"SELECT state, current_question_idx FROM quiz_sessions WHERE sid = ?",
(sid,),
)
session_row = await session_cur.fetchone()
events_cur = await db.execute(
"""
SELECT student_id, kind, COUNT(*) AS count
FROM student_events
WHERE sid = ? AND student_id IS NOT NULL
GROUP BY student_id, kind
""",
(sid,),
)
event_rows = await events_cur.fetchall()
current_idx = session_row["current_question_idx"] if session_row else None
answered_now: set[str] = set()
if current_idx is not None:
ans_cur = await db.execute(
"""
SELECT student_id FROM submissions
WHERE sid = ? AND question_idx = ? AND status = 'submitted'
""",
(sid, current_idx),
)
answered_now = {row["student_id"] for row in await ans_cur.fetchall()}
# Duplicate-join attempts (any student_id touched by an
# event whose kind=duplicate_join). For attempts on an
# existing student_id we want to surface to the legit owner.
dup_cur = await db.execute(
"""
SELECT student_id, COUNT(*) AS count, MAX(ts) AS latest_ts, MAX(detail) AS latest_detail
FROM student_events
WHERE sid = ? AND kind = 'duplicate_join' AND student_id IS NOT NULL
GROUP BY student_id
""",
(sid,),
)
dup_rows = await dup_cur.fetchall()
events_by_student: dict[str, dict[str, int]] = defaultdict(dict)
for row in event_rows:
events_by_student[row["student_id"]][row["kind"]] = int(row["count"])
dup_by_student = {
row["student_id"]: {
"count": int(row["count"]),
"latest_ts": row["latest_ts"],
"latest_detail": row["latest_detail"],
}
for row in dup_rows
}
rows: list[dict[str, Any]] = []
for participant in participants:
sid_id = participant["student_id"]
slot = self.presence.get(sid, {}).get(sid_id, {})
counts = events_by_student.get(sid_id, {})
rows.append(
{
"student_id": sid_id,
"name": participant["name"],
"joined_at": participant["joined_at"],
"connected": bool(slot.get("connected")),
"ws_count": int(slot.get("ws_count", 0)),
"last_seen_ms": int(slot.get("last_seen_ms", 0)) or None,
"blur_count": int(counts.get("blur", 0)),
"hidden_count": int(counts.get("visibility_hidden", 0)),
"duplicate_join_attempts": dup_by_student.get(sid_id, {"count": 0}),
"answered_current": sid_id in answered_now,
}
)
# Orphan duplicate-join attempts: an attempt on a student_id that
# has not yet been claimed by a real participant. Surface as a
# separate list so the instructor can see "someone tried to join
# as 12345 but nobody named 12345 has joined yet".
orphan_attempts = [
{"student_id": sid_id, **info}
for sid_id, info in dup_by_student.items()
if not any(p["student_id"] == sid_id for p in participants)
]
return {
"type": "presence_update",
"current_question_idx": current_idx,
"rows": rows,
"orphan_duplicate_joins": orphan_attempts,
}
async def broadcast_presence(self, sid: str) -> None:
await self.broadcast_instructors(sid, await self.presence_message(sid))
# ---- Projector (public big-screen view) -------------------------------
async def projector_snapshot(self, sid: str) -> dict[str, Any]:
"""Self-contained read-only payload for the projector page. No
student_ids; only aggregate distributions and the public top-N
leaderboard. Sent on initial GET + every WS state change."""
session = await self.get_session(sid)
pool = await self.get_pool_for_session(sid)
state = session["state"]
current_idx = session["current_question_idx"]
title = session["title"]
join_url = f"{self.settings.public_url}/?sid={sid}"
qr_url = _qr_data_url(join_url)
async with connect(self.settings.db_path) as db:
part_cur = await db.execute(
"SELECT COUNT(*) AS count FROM participants WHERE sid = ?", (sid,)
)
participant_count = int((await part_cur.fetchone())["count"])
question_block: dict[str, Any] | None = None
live_histogram: dict[str, Any] | None = None
reveal: dict[str, Any] | None = None
response_time_distribution: dict[str, Any] | None = None
if current_idx is not None and state in {"question_open", "question_closed"}:
question = get_question(pool, int(current_idx))
event = await self.get_question_event(sid, int(current_idx))
opened_ms = int(parse_ts(event["opened_at"]).timestamp() * 1000)
time_limit_s = int(event["time_limit"])
remaining_ms = max(0, opened_ms + time_limit_s * 1000 - now_ms()) if state == "question_open" else 0
question_block = {
"idx": int(current_idx),
"text": question["text"],
"options": question["options"],
"opened_at_server_ts": opened_ms,
"time_limit": time_limit_s,
"remaining_ms": remaining_ms,
"total_questions": question_count(pool),
}
histogram = await self.histogram(sid, int(current_idx), pending=True)
submitted = histogram["A"] + histogram["B"] + histogram["C"] + histogram["D"]
live_histogram = {
"counts": histogram,
"submitted_count": submitted,
"total_count": submitted + histogram["missed"] + histogram.get("pending", 0),
}
response_time_distribution = await self._response_time_buckets(sid, int(current_idx), time_limit_s)
if state == "question_closed":
reveal = {
"correct": question["correct"],
"explanation": question.get("explanation", ""),
}
leaderboard = await self.leaderboard(sid, limit=10)
# Strip student_ids from the public leaderboard. The instructor
# /admin board still has them via include_ids=True.
public_leaderboard = [
{"rank": row["rank"], "name": row["name"], "score": row["score"]}
for row in leaderboard
]
score_distribution = await self._score_distribution(sid, question_count(pool))
return {
"type": "projector_state",
"sid": sid,
"state": state,
"title": title,
"join_url": join_url,
"qr_url": qr_url,
"participant_count": participant_count,
"pool_meta": {
"question_count": question_count(pool),
"time_limit_default": pool["time_limit_default"],
"score_fn": pool["score_fn"],
},
"question": question_block,
"live_histogram": live_histogram,
"reveal": reveal,
"response_time_distribution": response_time_distribution,
"score_distribution": score_distribution,
"leaderboard": public_leaderboard,
"server_ts": now_ms(),
}
async def _response_time_buckets(self, sid: str, question_idx: int, time_limit_s: int) -> dict[str, Any]:
# Bucket elapsed-ms into 8 equal-width bins from 0..time_limit_s.
# Bins are {"label": "0-7s", "count": N, "is_correct_avg": 0..1}.
async with connect(self.settings.db_path) as db:
cur = await db.execute(
"""
SELECT s.elapsed_ms, s.answer
FROM submissions s
WHERE s.sid = ? AND s.question_idx = ? AND s.status = 'submitted' AND s.elapsed_ms IS NOT NULL
""",
(sid, question_idx),
)
rows = await cur.fetchall()
bins = 8
if time_limit_s <= 0:
time_limit_s = 60
edge_ms = (time_limit_s * 1000) / bins
buckets = [{"label": "", "count": 0} for _ in range(bins)]
for i in range(bins):
lo = round(edge_ms * i / 1000)
hi = round(edge_ms * (i + 1) / 1000)
buckets[i]["label"] = f"{lo}-{hi}s"
for row in rows:
ms = int(row["elapsed_ms"])
idx = min(bins - 1, max(0, int(ms // edge_ms)))
buckets[idx]["count"] += 1
total = sum(b["count"] for b in buckets)
return {"buckets": buckets, "total": total}
async def _score_distribution(self, sid: str, question_count_total: int) -> dict[str, Any]:
"""Histogram of per-student total scores. Bins are 10% of the
max-possible total (so every quiz lands on a 10-bucket axis
regardless of question count)."""
async with connect(self.settings.db_path) as db:
cur = await db.execute(
"""
SELECT p.student_id, COALESCE(SUM(s.score), 0) AS total
FROM participants p
LEFT JOIN submissions s ON s.sid = p.sid AND s.student_id = p.student_id
WHERE p.sid = ?
GROUP BY p.student_id
""",
(sid,),
)
rows = await cur.fetchall()
max_total = max(1, question_count_total)
bins = 10
edge = max_total / bins
buckets = [{"label": "", "count": 0} for _ in range(bins)]
for i in range(bins):
lo = round(edge * i, 1)
hi = round(edge * (i + 1), 1)
buckets[i]["label"] = f"{lo}-{hi}"
for row in rows:
total = float(row["total"])
idx = min(bins - 1, max(0, int(total // edge))) if edge > 0 else 0
buckets[idx]["count"] += 1
return {"buckets": buckets, "max_total": max_total, "n": len(rows)}
async def projector_ws(self, websocket: WebSocket, sid: str) -> None:
await websocket.accept()
self.projector_clients[sid].add(websocket)
try:
await websocket.send_json(await self.projector_snapshot(sid))
while True:
# Projector is read-only; we just keep the socket open and
# accept ping/keepalive messages so reverse proxies don't
# idle the connection out.
try:
data = await websocket.receive_json()
except json.JSONDecodeError:
continue
if isinstance(data, dict) and data.get("type") == "ping":
try:
await websocket.send_json({"type": "pong"})
except (WebSocketDisconnect, RuntimeError):
break
except (WebSocketDisconnect, RuntimeError):
pass
finally:
self.projector_clients[sid].discard(websocket)
async def broadcast_projectors(self, sid: str) -> None:
if not self.projector_clients.get(sid):
return
try:
snapshot = await self.projector_snapshot(sid)
except Exception:
return
for ws in list(self.projector_clients[sid]):
self._queue_send(ws, snapshot)
await asyncio.sleep(0)
async def live_histogram_message(self, sid: str, question_idx: int) -> dict[str, Any]:
histogram = await self.histogram(sid, question_idx, pending=True)
submitted_count = histogram["A"] + histogram["B"] + histogram["C"] + histogram["D"]
return {
"type": "live_histogram",
"question_idx": question_idx,
"histogram": histogram,
"submitted_count": submitted_count,
"total_count": submitted_count + histogram["missed"] + histogram.get("pending", 0),
}
async def full_leaderboard_message(self, sid: str) -> dict[str, Any]:
return {"type": "full_leaderboard", "leaderboard": await self.leaderboard(sid, include_ids=True)}
async def me(self, sid: str, student_id: str) -> dict[str, Any]:
async with connect(self.settings.db_path) as db:
part_cursor = await db.execute("SELECT * FROM participants WHERE sid = ? AND student_id = ?", (sid, student_id))
participant = await part_cursor.fetchone()
if participant is None:
# Participant row is gone (typically because the instructor
# ran a reset). Caller is expected to translate this into a
# 401 + cookie-clear so the client lands on the join form.
raise KeyError(f"No participant {student_id!r} in session {sid!r}")
sub_cursor = await db.execute(
"SELECT question_idx, answer, elapsed_ms, score, status FROM submissions WHERE sid = ? AND student_id = ? ORDER BY question_idx",
(sid, student_id),
)
submissions = await sub_cursor.fetchall()
return {
"student_id": participant["student_id"],
"name": participant["name"],
"total_score": await self.total_for(sid, student_id),
"submissions": [dict(row) for row in submissions],
}
async def stats(self, sid: str, question_idx: int | None, student_id: str | None = None) -> dict[str, Any]:
session = await self.get_session(sid)
qidx = question_idx if question_idx is not None else session["current_question_idx"]
if qidx is None:
return {
"question_idx": None,
"response_time_avg_ms": None,
"response_time_distribution": {},
"average_score": 0,
"top5": await self.leaderboard(sid, limit=5, you_student_id=student_id),
"your_rank": None,
}
async with connect(self.settings.db_path) as db:
cursor = await db.execute(
"""
SELECT elapsed_ms, score
FROM submissions
WHERE sid = ? AND question_idx = ? AND status = 'submitted'
""",
(sid, qidx),
)
rows = await cursor.fetchall()
times = [row["elapsed_ms"] for row in rows if row["elapsed_ms"] is not None]
scores = [row["score"] for row in rows]
distribution = {"0_10s": 0, "10_30s": 0, "30s_plus": 0}
for elapsed in times:
if elapsed < 10_000:
distribution["0_10s"] += 1
elif elapsed < 30_000:
distribution["10_30s"] += 1
else:
distribution["30s_plus"] += 1
payload = {
"question_idx": qidx,
"response_time_avg_ms": round(sum(times) / len(times)) if times else None,
"response_time_distribution": distribution,
"average_score": round(sum(scores) / len(scores), 2) if scores else 0,
"top5": await self.leaderboard(sid, limit=5, you_student_id=student_id),
}
if student_id:
payload["your_rank"] = await self.rank_for(sid, student_id)
return payload
async def broadcast_lobby(self, sid: str) -> None:
await self.broadcast_instructors(sid, await self.lobby_message(sid))
async def broadcast_question_closed(self, sid: str, question_idx: int) -> None:
for websocket, identity in list(self.student_clients[sid].items()):
self._queue_send(websocket, await self.question_closed_message(sid, question_idx, identity))
await self.broadcast_instructors(sid, await self.question_closed_message(sid, question_idx))
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
await self.broadcast_projectors(sid)
await asyncio.sleep(0)
async def broadcast_between_questions(self, sid: str, next_idx: int) -> None:
for websocket, identity in list(self.student_clients[sid].items()):
self._queue_send(websocket, await self.between_message(sid, next_idx, identity))
await self.broadcast_instructors(sid, await self.between_message(sid, next_idx))
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
await self.broadcast_projectors(sid)
await asyncio.sleep(0)
async def broadcast_session_ended(self, sid: str) -> None:
for websocket, identity in list(self.student_clients[sid].items()):
self._queue_send(websocket, await self.ended_message(sid, identity))
await self.broadcast_instructors(sid, await self.ended_message(sid))
await self.broadcast_projectors(sid)
await asyncio.sleep(0)
async def broadcast_students(self, sid: str, message: dict[str, Any]) -> None:
for websocket in list(self.student_clients[sid]):
self._queue_send(websocket, message)
await asyncio.sleep(0)
async def broadcast_instructors(self, sid: str, message: dict[str, Any]) -> None:
for websocket in list(self.instructor_clients[sid]):
self._queue_send(websocket, message)
await asyncio.sleep(0)
def _queue_send(self, websocket: WebSocket, message: dict[str, Any]) -> None:
asyncio.create_task(self._safe_send(websocket, message))
async def _safe_send(self, websocket: WebSocket, message: dict[str, Any]) -> None:
try:
await websocket.send_text(json.dumps(message))
except Exception:
pass