Three small UX/fairness tweaks from manual live testing: 1. Post-submit "wait for reveal" screen: show only the response time, no score. The +score reveal leaked correctness — any positive number = correct, zero = wrong — short-circuiting the "stop and think" beat the reveal pause was supposed to enforce. Time stays as the engagement signal; score now waits for the instructor reveal. 2. Final-screen "Correct X / Y" denominator is now total_questions instead of questions_answered. Missed questions are scored zero, so they belong in the denominator visibly. Server adds total_questions to the session_ended payload. 3. Projector score-distribution: drop the in-chart count labels (they collided with each other and with the median tag at small N), restore the previously-computed-but-not-rendered x-axis tick labels at the bottom. Stats line at the foot keeps n / mean / max. Also: short-circuit the per-submit instructor + presence broadcasts when no instructor / projector is connected (no listener, no DB work). The 50-student load test was tight on margin against its 2 s time_limit; with the new presence_message / live_histogram_message DB queries firing on every submit, the margin disappeared on busy boxes. Conftest fixture also bumped to 8 s per question for the same reason — gives breathing room for sequential WS submits in the load test. 71/71 pytest green.
1262 lines
59 KiB
Python
1262 lines
59 KiB
Python
"""In-process WebSocket room manager."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import UTC, datetime
|
|
from io import BytesIO
|
|
from typing import Any
|
|
|
|
import aiosqlite
|
|
import qrcode
|
|
import qrcode.image.svg
|
|
from fastapi import WebSocket, WebSocketDisconnect
|
|
|
|
from app.config import Settings
|
|
from app.db import connect
|
|
from app.pool import (
|
|
get_question,
|
|
parse_pool_json,
|
|
public_question_payload,
|
|
question_count,
|
|
question_time_limit,
|
|
resolve_option_key,
|
|
)
|
|
from app.scoring import SCORE_FNS
|
|
|
|
|
|
def now_utc() -> datetime:
|
|
return datetime.now(UTC)
|
|
|
|
|
|
def now_ms() -> int:
|
|
return int(now_utc().timestamp() * 1000)
|
|
|
|
|
|
def iso_now() -> str:
|
|
return now_utc().isoformat()
|
|
|
|
|
|
def parse_ts(value: str) -> datetime:
|
|
if value.endswith("Z"):
|
|
value = value[:-1] + "+00:00"
|
|
parsed = datetime.fromisoformat(value)
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=UTC)
|
|
return parsed
|
|
|
|
|
|
def _qr_data_url(value: str) -> str:
|
|
image = qrcode.make(value, image_factory=qrcode.image.svg.SvgPathImage)
|
|
buf = BytesIO()
|
|
image.save(buf)
|
|
encoded = base64.b64encode(buf.getvalue()).decode("ascii")
|
|
return f"data:image/svg+xml;base64,{encoded}"
|
|
|
|
|
|
class DuplicateStudentId(Exception):
|
|
"""Raised when a join request targets a student_id that is already
|
|
claimed by another active participant (first-claim-wins anti-hijack)."""
|
|
|
|
|
|
class RoomManager:
|
|
def __init__(self, settings: Settings):
|
|
self.settings = settings
|
|
self.student_clients: dict[str, dict[WebSocket, dict[str, Any]]] = defaultdict(dict)
|
|
self.instructor_clients: dict[str, set[WebSocket]] = defaultdict(set)
|
|
# Projector clients are public read-only; no per-client identity.
|
|
self.projector_clients: dict[str, set[WebSocket]] = defaultdict(set)
|
|
self.autoclose_tasks: dict[tuple[str, int], asyncio.Task] = {}
|
|
self.locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
|
|
# The single canonical session id, set during startup once the pool
|
|
# has been loaded. Routes use this rather than settings.default_session_id
|
|
# so that a session_id field in the pool JSON can override the env default.
|
|
self.canonical_sid: str | None = None
|
|
# Volatile presence: presence[sid][student_id] = {"connected": bool,
|
|
# "last_seen_ms": int, "ws_count": int}. Rebuilt on each WS connect
|
|
# / disconnect; not persisted (presence dies with the process).
|
|
self.presence: dict[str, dict[str, dict[str, Any]]] = defaultdict(dict)
|
|
|
|
async def ensure_single_session(self, sid: str, pool: dict[str, Any]) -> None:
|
|
"""Idempotently upsert the canonical single-session row + its quiz row.
|
|
|
|
Called on startup with the operator-supplied pool JSON. Creates the
|
|
quiz + session if they don't exist, otherwise updates the pool blob
|
|
on the existing quiz so a fresh restart picks up edits to the pool
|
|
file without losing prior submissions for the same session.
|
|
"""
|
|
title = pool["title"]
|
|
pool_blob = json.dumps(pool)
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT quiz_id FROM quiz_sessions WHERE sid = ?",
|
|
(sid,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row is None:
|
|
cursor = await db.execute(
|
|
"INSERT INTO quizzes (title, pool_json, time_limit_default, score_fn_name) VALUES (?, ?, ?, ?)",
|
|
(title, pool_blob, pool["time_limit_default"], pool["score_fn"]),
|
|
)
|
|
quiz_id = cursor.lastrowid
|
|
await db.execute(
|
|
"INSERT INTO quiz_sessions (sid, quiz_id, title) VALUES (?, ?, ?)",
|
|
(sid, quiz_id, title),
|
|
)
|
|
else:
|
|
quiz_id = row["quiz_id"]
|
|
await db.execute(
|
|
"UPDATE quizzes SET title = ?, pool_json = ?, time_limit_default = ?, score_fn_name = ? WHERE id = ?",
|
|
(title, pool_blob, pool["time_limit_default"], pool["score_fn"], quiz_id),
|
|
)
|
|
await db.execute(
|
|
"UPDATE quiz_sessions SET title = ? WHERE sid = ?",
|
|
(title, sid),
|
|
)
|
|
await db.commit()
|
|
|
|
async def advance_to_next(self, sid: str) -> None:
|
|
"""Instructor 'Next' button: a single button that drives the whole
|
|
lifecycle. From lobby it opens Q0; from a question_open state it
|
|
closes the current Q and opens the next; from question_closed it
|
|
opens the next Q. If there is no next question, the session ends.
|
|
"""
|
|
async with self.locks[sid]:
|
|
session = await self.get_session(sid)
|
|
if session["state"] == "finished":
|
|
return
|
|
current_idx = session["current_question_idx"]
|
|
close_current = session["state"] == "question_open"
|
|
if close_current:
|
|
await self._close_question_locked(sid, int(current_idx))
|
|
if close_current:
|
|
await self.broadcast_question_closed(sid, int(current_idx))
|
|
pool = await self.get_pool_for_session(sid)
|
|
total = question_count(pool)
|
|
next_idx = 0 if current_idx is None else int(current_idx) + 1
|
|
if next_idx >= total:
|
|
await self.end_session(sid)
|
|
return
|
|
await self.open_question(sid, next_idx)
|
|
|
|
async def reset(self, sid: str) -> None:
|
|
"""Wipe submissions, participants, and per-question state, then return
|
|
the session to lobby. Useful for re-running the same quiz across
|
|
classes without redeploying."""
|
|
async with self.locks[sid]:
|
|
task_keys = [key for key in self.autoclose_tasks if key[0] == sid]
|
|
for key in task_keys:
|
|
task = self.autoclose_tasks.pop(key, None)
|
|
if task:
|
|
task.cancel()
|
|
async with connect(self.settings.db_path) as db:
|
|
await db.execute("DELETE FROM submissions WHERE sid = ?", (sid,))
|
|
await db.execute("DELETE FROM question_events WHERE sid = ?", (sid,))
|
|
await db.execute("DELETE FROM participants WHERE sid = ?", (sid,))
|
|
await db.execute("DELETE FROM student_events WHERE sid = ?", (sid,))
|
|
await db.execute(
|
|
"UPDATE quiz_sessions SET state = 'lobby', current_question_idx = NULL, finished_at = NULL WHERE sid = ?",
|
|
(sid,),
|
|
)
|
|
await db.commit()
|
|
# Tell each student client the session was reset BEFORE closing the
|
|
# socket, so the JS can clear local state and re-bootstrap into the
|
|
# join form rather than showing a generic "disconnected" screen.
|
|
for ws in list(self.student_clients.get(sid, {}).keys()):
|
|
try:
|
|
await ws.send_json({"type": "session_reset"})
|
|
except Exception:
|
|
pass
|
|
try:
|
|
await ws.close(code=4002)
|
|
except Exception:
|
|
pass
|
|
self.student_clients.pop(sid, None)
|
|
# Presence is volatile — wipe alongside the participant table so
|
|
# the next instructor snapshot doesn't show stale ghost rows.
|
|
self.presence.pop(sid, None)
|
|
await self.broadcast_instructors(sid, {"type": "state", "state": "lobby", "current_question_idx": None, "title": (await self.get_session(sid))["title"]})
|
|
await self.broadcast_lobby(sid)
|
|
await self.broadcast_presence(sid)
|
|
await self.broadcast_projectors(sid)
|
|
|
|
async def sessions_active(self) -> int:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute("SELECT COUNT(*) AS count FROM quiz_sessions WHERE state != 'finished'")
|
|
row = await cursor.fetchone()
|
|
return int(row["count"])
|
|
|
|
def ws_client_count(self) -> int:
|
|
return (
|
|
sum(len(clients) for clients in self.student_clients.values())
|
|
+ sum(len(clients) for clients in self.instructor_clients.values())
|
|
+ sum(len(clients) for clients in self.projector_clients.values())
|
|
)
|
|
|
|
async def cookie_id_matches(self, sid: str, student_id: str, cookie_id: str) -> bool:
|
|
"""Check the student's signed cookie_id against the DB participant
|
|
row. Used to defend against the post-recovery re-attack: after
|
|
admin clears a hijacked id and the legitimate student re-joins
|
|
with a fresh cookie_id, the original hijacker's cookie is still
|
|
cryptographically valid (the secret key is unchanged), but the
|
|
DB cookie_id now belongs to the legit student. We reject any
|
|
request whose cookie_id doesn't match the current row."""
|
|
async with connect(self.settings.db_path) as db:
|
|
cur = await db.execute(
|
|
"SELECT cookie_id FROM participants WHERE sid = ? AND student_id = ?",
|
|
(sid, student_id),
|
|
)
|
|
row = await cur.fetchone()
|
|
return row is not None and row["cookie_id"] == cookie_id
|
|
|
|
async def add_participant(self, sid: str, student_id: str, name: str, cookie_id: str) -> None:
|
|
"""First-claim-wins. Raises DuplicateStudentId if this student_id
|
|
is already in the participants table for this sid (an attempt to
|
|
hijack another student's id, or a legit student returning after
|
|
clearing cookies). The route handler turns the exception into a
|
|
409 + records a `duplicate_join` audit event so the instructor
|
|
can see the attempt on the live presence panel."""
|
|
async with connect(self.settings.db_path) as db:
|
|
try:
|
|
await db.execute(
|
|
"INSERT INTO participants (sid, student_id, name, cookie_id) VALUES (?, ?, ?, ?)",
|
|
(sid, student_id, name, cookie_id),
|
|
)
|
|
except aiosqlite.IntegrityError as exc:
|
|
# PK violation = student_id already claimed in this sid.
|
|
raise DuplicateStudentId(student_id) from exc
|
|
await db.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score)
|
|
SELECT sid, ?, question_idx, 'missed', 0
|
|
FROM question_events
|
|
WHERE sid = ? AND closed_at IS NOT NULL
|
|
""",
|
|
(student_id, sid),
|
|
)
|
|
await db.commit()
|
|
await self.broadcast_lobby(sid)
|
|
await self.broadcast_presence(sid)
|
|
await self.broadcast_projectors(sid)
|
|
|
|
async def log_event(
|
|
self,
|
|
sid: str,
|
|
student_id: str | None,
|
|
kind: str,
|
|
question_idx: int | None = None,
|
|
detail: dict[str, Any] | None = None,
|
|
) -> None:
|
|
async with connect(self.settings.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO student_events (sid, student_id, question_idx, kind, detail)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(sid, student_id, question_idx, kind, json.dumps(detail) if detail else None),
|
|
)
|
|
await db.commit()
|
|
|
|
async def clear_student(self, sid: str, student_id: str) -> bool:
|
|
"""Admin recovery hatch for first-claim-wins: remove a participant
|
|
+ all their submissions so the legitimate student can re-claim
|
|
their id. Returns True if a row was removed."""
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"DELETE FROM submissions WHERE sid = ? AND student_id = ?",
|
|
(sid, student_id),
|
|
)
|
|
cursor = await db.execute(
|
|
"DELETE FROM participants WHERE sid = ? AND student_id = ?",
|
|
(sid, student_id),
|
|
)
|
|
removed = cursor.rowcount > 0
|
|
await db.commit()
|
|
if removed:
|
|
self.presence.get(sid, {}).pop(student_id, None)
|
|
# Kick any active WS for this student_id so a stale cookie can
|
|
# no longer drive submissions. /me will 401 (cookie cleared)
|
|
# and the page will land on the join form.
|
|
for ws, ident in list(self.student_clients.get(sid, {}).items()):
|
|
if ident.get("student_id") == student_id:
|
|
try:
|
|
await ws.send_json({"type": "session_reset"})
|
|
except Exception:
|
|
pass
|
|
try:
|
|
await ws.close(code=4002)
|
|
except Exception:
|
|
pass
|
|
await self.broadcast_lobby(sid)
|
|
await self.broadcast_presence(sid)
|
|
await self.broadcast_projectors(sid)
|
|
return removed
|
|
|
|
async def student_ws(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None:
|
|
await websocket.accept()
|
|
self.student_clients[sid][websocket] = identity
|
|
student_id = identity["student_id"]
|
|
slot = self.presence[sid].setdefault(
|
|
student_id,
|
|
{"connected": False, "last_seen_ms": now_ms(), "ws_count": 0, "name": identity.get("name", "")},
|
|
)
|
|
slot["ws_count"] += 1
|
|
slot["connected"] = True
|
|
slot["last_seen_ms"] = now_ms()
|
|
slot["name"] = identity.get("name", slot.get("name", ""))
|
|
await self.broadcast_presence(sid)
|
|
try:
|
|
await self.send_student_snapshot(websocket, sid, identity)
|
|
while True:
|
|
try:
|
|
data = await websocket.receive_json()
|
|
except json.JSONDecodeError:
|
|
try:
|
|
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Invalid JSON"})
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
break
|
|
continue
|
|
if not isinstance(data, dict):
|
|
try:
|
|
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Message must be a JSON object"})
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
break
|
|
continue
|
|
msg_type = data.get("type")
|
|
if msg_type == "ping":
|
|
await websocket.send_json({"type": "pong"})
|
|
elif msg_type == "submit":
|
|
ack = await self.submit_answer(sid, identity["student_id"], data.get("question_idx"), data.get("answer"))
|
|
await websocket.send_json(ack)
|
|
else:
|
|
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"})
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
pass
|
|
finally:
|
|
self.student_clients[sid].pop(websocket, None)
|
|
slot = self.presence.get(sid, {}).get(student_id)
|
|
if slot:
|
|
slot["ws_count"] = max(0, slot.get("ws_count", 1) - 1)
|
|
slot["connected"] = slot["ws_count"] > 0
|
|
slot["last_seen_ms"] = now_ms()
|
|
await self.broadcast_presence(sid)
|
|
|
|
async def instructor_ws(self, websocket: WebSocket, sid: str) -> None:
|
|
await websocket.accept()
|
|
self.instructor_clients[sid].add(websocket)
|
|
try:
|
|
await self.send_instructor_snapshot(websocket, sid)
|
|
while True:
|
|
try:
|
|
data = await websocket.receive_json()
|
|
except json.JSONDecodeError:
|
|
try:
|
|
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Invalid JSON"})
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
break
|
|
continue
|
|
if not isinstance(data, dict):
|
|
try:
|
|
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Message must be a JSON object"})
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
break
|
|
continue
|
|
msg_type = data.get("type")
|
|
if msg_type == "ping":
|
|
await websocket.send_json({"type": "pong"})
|
|
elif msg_type == "open_question":
|
|
await self.open_question(sid, int(data.get("question_idx", 0)), data.get("time_limit"))
|
|
elif msg_type == "close_question":
|
|
await self.close_question(sid)
|
|
elif msg_type == "next":
|
|
await self.advance_to_next(sid)
|
|
elif msg_type == "end_session":
|
|
await self.end_session(sid)
|
|
elif msg_type == "reset":
|
|
await self.reset(sid)
|
|
else:
|
|
await websocket.send_json({"type": "error", "code": "bad_message", "message": "Unknown message type"})
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
pass
|
|
finally:
|
|
self.instructor_clients[sid].discard(websocket)
|
|
|
|
async def send_student_snapshot(self, websocket: WebSocket, sid: str, identity: dict[str, Any]) -> None:
|
|
session = await self.get_session(sid)
|
|
await websocket.send_json(
|
|
{
|
|
"type": "state",
|
|
"state": session["state"],
|
|
"current_question_idx": session["current_question_idx"],
|
|
"title": session["title"],
|
|
}
|
|
)
|
|
if session["state"] == "question_open":
|
|
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
|
|
ack = await self.existing_submit_ack(sid, identity["student_id"], session["current_question_idx"])
|
|
if ack:
|
|
await websocket.send_json(ack)
|
|
elif session["state"] == "question_closed":
|
|
# Replay the reveal so a student joining mid-reveal sees the
|
|
# closed-question card with their answer / correct option /
|
|
# leaderboard, instead of being stuck on the join form's
|
|
# disabled state waiting for an event that never arrives.
|
|
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
|
|
await websocket.send_json(
|
|
await self.question_closed_message(sid, session["current_question_idx"], identity)
|
|
)
|
|
elif session["state"] == "finished":
|
|
await websocket.send_json(await self.ended_message(sid, identity))
|
|
|
|
async def send_instructor_snapshot(self, websocket: WebSocket, sid: str) -> None:
|
|
session = await self.get_session(sid)
|
|
await websocket.send_json(
|
|
{
|
|
"type": "state",
|
|
"state": session["state"],
|
|
"current_question_idx": session["current_question_idx"],
|
|
"title": session["title"],
|
|
}
|
|
)
|
|
await websocket.send_json(await self.lobby_message(sid))
|
|
await websocket.send_json(await self.presence_message(sid))
|
|
# When an instructor reconnects mid-session, replay enough payloads
|
|
# for the SPA to render the current state without waiting for the
|
|
# next event. Otherwise the dashboard sits on a "Reveal pending..."
|
|
# placeholder forever.
|
|
if session["state"] == "question_open":
|
|
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
|
|
await websocket.send_json(await self.live_histogram_message(sid, session["current_question_idx"]))
|
|
elif session["state"] == "question_closed":
|
|
await websocket.send_json(await self.question_open_message(sid, session["current_question_idx"]))
|
|
await websocket.send_json(await self.question_closed_message(sid, session["current_question_idx"]))
|
|
await websocket.send_json(await self.full_leaderboard_message(sid))
|
|
elif session["state"] in {"between_questions", "finished"}:
|
|
await websocket.send_json(await self.full_leaderboard_message(sid))
|
|
|
|
async def open_question(self, sid: str, question_idx: int, time_limit: int | None = None) -> None:
|
|
async with self.locks[sid]:
|
|
session = await self.get_session(sid)
|
|
if session["state"] == "question_open" and session["current_question_idx"] == question_idx:
|
|
return
|
|
if session["state"] == "question_open":
|
|
await self._close_question_locked(sid, session["current_question_idx"])
|
|
pool = await self.get_pool_for_session(sid)
|
|
if question_idx < 0 or question_idx >= question_count(pool):
|
|
await self.broadcast_instructors(sid, {"type": "error", "code": "bad_question", "message": "Question index out of range"})
|
|
return
|
|
limit = int(time_limit or question_time_limit(pool, question_idx))
|
|
opened = iso_now()
|
|
async with connect(self.settings.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO question_events (sid, question_idx, opened_at, time_limit)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(sid, question_idx) DO UPDATE SET opened_at = excluded.opened_at,
|
|
closed_at = NULL, time_limit = excluded.time_limit
|
|
""",
|
|
(sid, question_idx, opened, limit),
|
|
)
|
|
await db.execute(
|
|
"UPDATE quiz_sessions SET state = 'question_open', current_question_idx = ? WHERE sid = ?",
|
|
(question_idx, sid),
|
|
)
|
|
await db.commit()
|
|
self._schedule_autoclose(sid, question_idx, limit)
|
|
msg = await self.question_open_message(sid, question_idx)
|
|
await self.broadcast_students(sid, msg)
|
|
await self.broadcast_instructors(sid, msg)
|
|
await self.broadcast_instructors(sid, await self.live_histogram_message(sid, question_idx))
|
|
await self.broadcast_projectors(sid)
|
|
|
|
async def close_question(self, sid: str) -> None:
|
|
async with self.locks[sid]:
|
|
session = await self.get_session(sid)
|
|
if session["state"] != "question_open":
|
|
return
|
|
question_idx = session["current_question_idx"]
|
|
await self._close_question_locked(sid, question_idx)
|
|
await self.broadcast_question_closed(sid, question_idx)
|
|
|
|
async def _close_question_locked(self, sid: str, question_idx: int) -> None:
|
|
task = self.autoclose_tasks.pop((sid, question_idx), None)
|
|
current = asyncio.current_task()
|
|
if task and task is not current:
|
|
task.cancel()
|
|
async with connect(self.settings.db_path) as db:
|
|
await db.execute(
|
|
"UPDATE question_events SET closed_at = COALESCE(closed_at, ?) WHERE sid = ? AND question_idx = ?",
|
|
(iso_now(), sid, question_idx),
|
|
)
|
|
await db.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO submissions (sid, student_id, question_idx, status, score)
|
|
SELECT p.sid, p.student_id, ?, 'missed', 0
|
|
FROM participants p
|
|
WHERE p.sid = ?
|
|
""",
|
|
(question_idx, sid),
|
|
)
|
|
await db.execute(
|
|
"UPDATE quiz_sessions SET state = 'question_closed', current_question_idx = ? WHERE sid = ?",
|
|
(question_idx, sid),
|
|
)
|
|
await db.commit()
|
|
|
|
async def next_question(self, sid: str) -> None:
|
|
async with self.locks[sid]:
|
|
session = await self.get_session(sid)
|
|
if session["state"] != "question_closed":
|
|
return
|
|
next_idx = int(session["current_question_idx"]) + 1
|
|
async with connect(self.settings.db_path) as db:
|
|
await db.execute("UPDATE quiz_sessions SET state = 'between_questions' WHERE sid = ?", (sid,))
|
|
await db.commit()
|
|
await self.broadcast_between_questions(sid, next_idx)
|
|
|
|
async def end_session(self, sid: str) -> None:
|
|
async with self.locks[sid]:
|
|
session = await self.get_session(sid)
|
|
if session["state"] == "question_open":
|
|
await self._close_question_locked(sid, session["current_question_idx"])
|
|
async with connect(self.settings.db_path) as db:
|
|
await db.execute(
|
|
"UPDATE quiz_sessions SET state = 'finished', current_question_idx = NULL, finished_at = ? WHERE sid = ?",
|
|
(iso_now(), sid),
|
|
)
|
|
await db.commit()
|
|
await self.broadcast_session_ended(sid)
|
|
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
|
|
|
|
async def submit_answer(self, sid: str, student_id: str, question_idx: Any, answer: Any) -> dict[str, Any]:
|
|
"""Record a student's answer and grade it.
|
|
|
|
`answer` accepts either the option's full text (production wire
|
|
format from the letterless student UI) or a canonical letter
|
|
(internal callers + tests). Anything that doesn't resolve to one
|
|
of the four options is recorded as a zero-score submission and
|
|
locked in via the PK — circumvention attempts can't retry.
|
|
"""
|
|
try:
|
|
qidx = int(question_idx)
|
|
except (TypeError, ValueError):
|
|
return {"type": "error", "code": "bad_question", "message": "Invalid question index"}
|
|
if not isinstance(answer, str):
|
|
return {"type": "error", "code": "bad_answer", "message": "Answer must be a string"}
|
|
async with self.locks[sid]:
|
|
session = await self.get_session(sid)
|
|
if session["state"] != "question_open" or session["current_question_idx"] != qidx:
|
|
return {"type": "error", "code": "not_open", "message": "Question is not open"}
|
|
existing = await self.existing_submit_ack(sid, student_id, qidx)
|
|
if existing:
|
|
return existing
|
|
event = await self.get_question_event(sid, qidx)
|
|
opened_at = parse_ts(event["opened_at"])
|
|
elapsed_ms = max(0, int((now_utc() - opened_at).total_seconds() * 1000))
|
|
time_limit_ms = int(event["time_limit"]) * 1000
|
|
if elapsed_ms > time_limit_ms:
|
|
return {"type": "error", "code": "time_expired", "message": "Question time has expired"}
|
|
pool = await self.get_pool_for_session(sid)
|
|
question = get_question(pool, qidx)
|
|
resolved = resolve_option_key(question, answer)
|
|
if resolved is None:
|
|
# Failsafe: option didn't match any of the four texts.
|
|
# Lock in a zero-score submission rather than erroring,
|
|
# so an attempt to circumvent the UI by sending arbitrary
|
|
# text doesn't get a free retry.
|
|
score = 0.0
|
|
stored_answer: str | None = None
|
|
correct = False
|
|
else:
|
|
correct = resolved == question["correct"]
|
|
score_fn = SCORE_FNS[pool["score_fn"]]
|
|
score = score_fn(correct, elapsed_ms, time_limit_ms)
|
|
stored_answer = resolved
|
|
submitted_at = iso_now()
|
|
async with connect(self.settings.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO submissions (sid, student_id, question_idx, answer, submitted_at, elapsed_ms, score, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, 'submitted')
|
|
ON CONFLICT(sid, student_id, question_idx) DO NOTHING
|
|
""",
|
|
(sid, student_id, qidx, stored_answer, submitted_at, elapsed_ms, score),
|
|
)
|
|
await db.commit()
|
|
# Skip live histogram build when there's no instructor listening
|
|
# — same rationale as broadcast_presence. Submit storm should not
|
|
# be paying for DB work that nobody consumes.
|
|
if self.instructor_clients.get(sid):
|
|
await self.broadcast_instructors(sid, await self.live_histogram_message(sid, qidx))
|
|
await self.broadcast_presence(sid)
|
|
await self.broadcast_projectors(sid)
|
|
return {"type": "submit_ack", "question_idx": qidx, "answer": stored_answer, "score": score, "elapsed_ms": elapsed_ms}
|
|
|
|
def _schedule_autoclose(self, sid: str, question_idx: int, time_limit: int) -> None:
|
|
previous = self.autoclose_tasks.pop((sid, question_idx), None)
|
|
if previous:
|
|
previous.cancel()
|
|
self.autoclose_tasks[(sid, question_idx)] = asyncio.create_task(self._autoclose_after(sid, question_idx, time_limit))
|
|
|
|
async def _autoclose_after(self, sid: str, question_idx: int, time_limit: int) -> None:
|
|
try:
|
|
await asyncio.sleep(time_limit)
|
|
async with self.locks[sid]:
|
|
session = await self.get_session(sid)
|
|
if session["state"] == "question_open" and session["current_question_idx"] == question_idx:
|
|
await self._close_question_locked(sid, question_idx)
|
|
else:
|
|
return
|
|
await self.broadcast_question_closed(sid, question_idx)
|
|
except asyncio.CancelledError:
|
|
return
|
|
|
|
async def get_session(self, sid: str) -> dict[str, Any]:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute("SELECT * FROM quiz_sessions WHERE sid = ?", (sid,))
|
|
row = await cursor.fetchone()
|
|
if row is None:
|
|
raise KeyError(f"Unknown session {sid}")
|
|
return dict(row)
|
|
|
|
async def session_exists(self, sid: str) -> bool:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute("SELECT 1 FROM quiz_sessions WHERE sid = ?", (sid,))
|
|
row = await cursor.fetchone()
|
|
return row is not None
|
|
|
|
async def get_pool_for_session(self, sid: str) -> dict[str, Any]:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"""
|
|
SELECT q.pool_json
|
|
FROM quizzes q
|
|
JOIN quiz_sessions s ON s.quiz_id = q.id
|
|
WHERE s.sid = ?
|
|
""",
|
|
(sid,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row is None:
|
|
raise KeyError(f"Unknown session {sid}")
|
|
return parse_pool_json(row["pool_json"])
|
|
|
|
async def get_question_event(self, sid: str, question_idx: int) -> dict[str, Any]:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT * FROM question_events WHERE sid = ? AND question_idx = ?",
|
|
(sid, question_idx),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row is None:
|
|
raise KeyError("Question has not been opened")
|
|
return dict(row)
|
|
|
|
async def existing_submit_ack(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"""
|
|
SELECT answer, elapsed_ms, score, status
|
|
FROM submissions
|
|
WHERE sid = ? AND student_id = ? AND question_idx = ? AND status = 'submitted'
|
|
""",
|
|
(sid, student_id, question_idx),
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row is None:
|
|
return None
|
|
return {
|
|
"type": "submit_ack",
|
|
"question_idx": question_idx,
|
|
"answer": row["answer"],
|
|
"score": row["score"],
|
|
"elapsed_ms": row["elapsed_ms"],
|
|
}
|
|
|
|
async def question_open_message(self, sid: str, question_idx: int) -> dict[str, Any]:
|
|
pool = await self.get_pool_for_session(sid)
|
|
event = await self.get_question_event(sid, question_idx)
|
|
opened_ms = int(parse_ts(event["opened_at"]).timestamp() * 1000)
|
|
remaining_ms = max(0, opened_ms + int(event["time_limit"]) * 1000 - now_ms())
|
|
msg = {"type": "question_open", **public_question_payload(pool, question_idx)}
|
|
msg["opened_at_server_ts"] = opened_ms
|
|
msg["remaining_ms"] = remaining_ms
|
|
return msg
|
|
|
|
async def question_closed_message(self, sid: str, question_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
pool = await self.get_pool_for_session(sid)
|
|
question = get_question(pool, question_idx)
|
|
you_id = identity["student_id"] if identity else None
|
|
msg = {
|
|
"type": "question_closed",
|
|
"question_idx": question_idx,
|
|
"correct": question["correct"],
|
|
"explanation": question.get("explanation", ""),
|
|
"histogram": await self.histogram(sid, question_idx),
|
|
"top5": await self.leaderboard(sid, limit=5, you_student_id=you_id),
|
|
}
|
|
if identity:
|
|
student = identity["student_id"]
|
|
submission = await self.submission_for(sid, student, question_idx)
|
|
rank = await self.rank_for(sid, student)
|
|
total = await self.total_for(sid, student)
|
|
msg.update(
|
|
{
|
|
"your_answer": submission.get("answer") if submission else None,
|
|
"your_score": submission.get("score", 0) if submission else 0,
|
|
"your_rank": rank,
|
|
"your_total": total,
|
|
}
|
|
)
|
|
return msg
|
|
|
|
async def between_message(self, sid: str, next_idx: int, identity: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
you_id = identity["student_id"] if identity else None
|
|
msg = {"type": "between_questions", "next_idx": next_idx, "top5": await self.leaderboard(sid, limit=5, you_student_id=you_id)}
|
|
if identity:
|
|
msg["your_rank"] = await self.rank_for(sid, identity["student_id"])
|
|
msg["your_total"] = await self.total_for(sid, identity["student_id"])
|
|
return msg
|
|
|
|
async def ended_message(self, sid: str, identity: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
you_id = identity["student_id"] if identity else None
|
|
pool = await self.get_pool_for_session(sid)
|
|
msg = {
|
|
"type": "session_ended",
|
|
"final_top5": await self.leaderboard(sid, limit=5, you_student_id=you_id),
|
|
# Total questions in the pool — clients use this as the
|
|
# denominator on the "Correct X / Y" display so missed
|
|
# questions are visibly counted as wrong (X stays low),
|
|
# rather than hiding behind a smaller denominator.
|
|
"total_questions": question_count(pool),
|
|
}
|
|
if identity:
|
|
student = identity["student_id"]
|
|
msg.update(await self.student_summary(sid, student))
|
|
msg["your_rank"] = await self.rank_for(sid, student)
|
|
msg["your_total"] = await self.total_for(sid, student)
|
|
return msg
|
|
|
|
async def histogram(self, sid: str, question_idx: int, pending: bool = False) -> dict[str, int]:
|
|
result = {"A": 0, "B": 0, "C": 0, "D": 0, "missed": 0}
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT answer, status, COUNT(*) AS count FROM submissions WHERE sid = ? AND question_idx = ? GROUP BY answer, status",
|
|
(sid, question_idx),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
total_cursor = await db.execute("SELECT COUNT(*) AS count FROM participants WHERE sid = ?", (sid,))
|
|
total_row = await total_cursor.fetchone()
|
|
submitted = 0
|
|
for row in rows:
|
|
if row["status"] == "missed":
|
|
result["missed"] += row["count"]
|
|
elif row["answer"] in {"A", "B", "C", "D"}:
|
|
result[row["answer"]] += row["count"]
|
|
submitted += row["count"]
|
|
else:
|
|
# status='submitted' but answer didn't match any option
|
|
# (failsafe path in submit_answer). For aggregate display
|
|
# we bucket alongside legitimate "missed" — both yield
|
|
# zero credit and the instructor cares about the same
|
|
# thing: this student didn't pick a real option.
|
|
result["missed"] += row["count"]
|
|
if pending:
|
|
result["pending"] = max(0, int(total_row["count"]) - submitted - result["missed"])
|
|
return result
|
|
|
|
async def leaderboard(
|
|
self,
|
|
sid: str,
|
|
limit: int | None = None,
|
|
include_ids: bool = False,
|
|
you_student_id: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Top scores. If `you_student_id` is given and that student appears
|
|
in the returned slice, that one entry is marked with `is_you: True`
|
|
so the client can highlight by id without exposing other students'
|
|
ids over the wire."""
|
|
query_limit = "" if limit is None else f"LIMIT {int(limit)}"
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
f"""
|
|
SELECT p.student_id, p.name, COALESCE(SUM(s.score), 0) AS score
|
|
FROM participants p
|
|
LEFT JOIN submissions s ON s.sid = p.sid AND s.student_id = p.student_id
|
|
WHERE p.sid = ?
|
|
GROUP BY p.student_id, p.name
|
|
ORDER BY score DESC, p.name ASC, p.student_id ASC
|
|
{query_limit}
|
|
""",
|
|
(sid,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
board = []
|
|
for rank, row in enumerate(rows, start=1):
|
|
item = {"rank": rank, "name": row["name"], "score": float(row["score"])}
|
|
if include_ids:
|
|
item["student_id"] = row["student_id"]
|
|
if you_student_id is not None and row["student_id"] == you_student_id:
|
|
item["is_you"] = True
|
|
board.append(item)
|
|
return board
|
|
|
|
async def rank_for(self, sid: str, student_id: str) -> int | None:
|
|
board = await self.leaderboard(sid, include_ids=True)
|
|
for item in board:
|
|
if item["student_id"] == student_id:
|
|
return item["rank"]
|
|
return None
|
|
|
|
async def total_for(self, sid: str, student_id: str) -> float:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT COALESCE(SUM(score), 0) AS total FROM submissions WHERE sid = ? AND student_id = ?",
|
|
(sid, student_id),
|
|
)
|
|
row = await cursor.fetchone()
|
|
# Snap to two decimals so the sum stays display-friendly even after
|
|
# many small float additions; the per-question scores are already
|
|
# on a 0.05 grid, so this is mostly defensive.
|
|
return round(float(row["total"]), 2)
|
|
|
|
async def submission_for(self, sid: str, student_id: str, question_idx: int) -> dict[str, Any] | None:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT * FROM submissions WHERE sid = ? AND student_id = ? AND question_idx = ?",
|
|
(sid, student_id, question_idx),
|
|
)
|
|
row = await cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
async def student_summary(self, sid: str, student_id: str) -> dict[str, int]:
|
|
pool = await self.get_pool_for_session(sid)
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT question_idx, answer, status FROM submissions WHERE sid = ? AND student_id = ?",
|
|
(sid, student_id),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
answered = sum(1 for row in rows if row["status"] == "submitted")
|
|
correct = 0
|
|
for row in rows:
|
|
if row["status"] == "submitted" and row["answer"] == get_question(pool, row["question_idx"])["correct"]:
|
|
correct += 1
|
|
return {"questions_answered": answered, "questions_correct": correct}
|
|
|
|
async def lobby_message(self, sid: str) -> dict[str, Any]:
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"SELECT student_id, name, joined_at FROM participants WHERE sid = ? ORDER BY joined_at, name",
|
|
(sid,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
participants = [dict(row) for row in rows]
|
|
return {"type": "lobby_update", "participants": participants, "count": len(participants)}
|
|
|
|
async def presence_message(self, sid: str) -> dict[str, Any]:
|
|
"""Per-student live presence: connected/idle, last_seen, blur+
|
|
visibility-hidden counts, current-question-answered flag, and
|
|
any duplicate-join attempts on that id. Broadcast to the
|
|
instructor on every connect / disconnect / join / answer."""
|
|
async with connect(self.settings.db_path) as db:
|
|
participants_cur = await db.execute(
|
|
"SELECT student_id, name, joined_at FROM participants WHERE sid = ? ORDER BY joined_at, name",
|
|
(sid,),
|
|
)
|
|
participants = await participants_cur.fetchall()
|
|
session_cur = await db.execute(
|
|
"SELECT state, current_question_idx FROM quiz_sessions WHERE sid = ?",
|
|
(sid,),
|
|
)
|
|
session_row = await session_cur.fetchone()
|
|
events_cur = await db.execute(
|
|
"""
|
|
SELECT student_id, kind, COUNT(*) AS count
|
|
FROM student_events
|
|
WHERE sid = ? AND student_id IS NOT NULL
|
|
GROUP BY student_id, kind
|
|
""",
|
|
(sid,),
|
|
)
|
|
event_rows = await events_cur.fetchall()
|
|
current_idx = session_row["current_question_idx"] if session_row else None
|
|
answered_now: set[str] = set()
|
|
if current_idx is not None:
|
|
ans_cur = await db.execute(
|
|
"""
|
|
SELECT student_id FROM submissions
|
|
WHERE sid = ? AND question_idx = ? AND status = 'submitted'
|
|
""",
|
|
(sid, current_idx),
|
|
)
|
|
answered_now = {row["student_id"] for row in await ans_cur.fetchall()}
|
|
# Duplicate-join attempts (any student_id touched by an
|
|
# event whose kind=duplicate_join). For attempts on an
|
|
# existing student_id we want to surface to the legit owner.
|
|
dup_cur = await db.execute(
|
|
"""
|
|
SELECT student_id, COUNT(*) AS count, MAX(ts) AS latest_ts, MAX(detail) AS latest_detail
|
|
FROM student_events
|
|
WHERE sid = ? AND kind = 'duplicate_join' AND student_id IS NOT NULL
|
|
GROUP BY student_id
|
|
""",
|
|
(sid,),
|
|
)
|
|
dup_rows = await dup_cur.fetchall()
|
|
events_by_student: dict[str, dict[str, int]] = defaultdict(dict)
|
|
for row in event_rows:
|
|
events_by_student[row["student_id"]][row["kind"]] = int(row["count"])
|
|
dup_by_student = {
|
|
row["student_id"]: {
|
|
"count": int(row["count"]),
|
|
"latest_ts": row["latest_ts"],
|
|
"latest_detail": row["latest_detail"],
|
|
}
|
|
for row in dup_rows
|
|
}
|
|
rows: list[dict[str, Any]] = []
|
|
for participant in participants:
|
|
sid_id = participant["student_id"]
|
|
slot = self.presence.get(sid, {}).get(sid_id, {})
|
|
counts = events_by_student.get(sid_id, {})
|
|
rows.append(
|
|
{
|
|
"student_id": sid_id,
|
|
"name": participant["name"],
|
|
"joined_at": participant["joined_at"],
|
|
"connected": bool(slot.get("connected")),
|
|
"ws_count": int(slot.get("ws_count", 0)),
|
|
"last_seen_ms": int(slot.get("last_seen_ms", 0)) or None,
|
|
"blur_count": int(counts.get("blur", 0)),
|
|
"hidden_count": int(counts.get("visibility_hidden", 0)),
|
|
"duplicate_join_attempts": dup_by_student.get(sid_id, {"count": 0}),
|
|
"answered_current": sid_id in answered_now,
|
|
}
|
|
)
|
|
# Orphan duplicate-join attempts: an attempt on a student_id that
|
|
# has not yet been claimed by a real participant. Surface as a
|
|
# separate list so the instructor can see "someone tried to join
|
|
# as 12345 but nobody named 12345 has joined yet".
|
|
orphan_attempts = [
|
|
{"student_id": sid_id, **info}
|
|
for sid_id, info in dup_by_student.items()
|
|
if not any(p["student_id"] == sid_id for p in participants)
|
|
]
|
|
return {
|
|
"type": "presence_update",
|
|
"current_question_idx": current_idx,
|
|
"rows": rows,
|
|
"orphan_duplicate_joins": orphan_attempts,
|
|
}
|
|
|
|
async def broadcast_presence(self, sid: str) -> None:
|
|
# Skip the (DB-heavy) message build when no instructor is listening.
|
|
# The presence_message touches participants + question_events +
|
|
# student_events + submissions; on a 50-student submit storm
|
|
# those queries ran for every submit even if no admin was on
|
|
# the WS, eating budget that mattered to the time-limited
|
|
# question close.
|
|
if not self.instructor_clients.get(sid):
|
|
return
|
|
await self.broadcast_instructors(sid, await self.presence_message(sid))
|
|
|
|
# ---- Projector (public big-screen view) -------------------------------
|
|
|
|
async def projector_snapshot(self, sid: str) -> dict[str, Any]:
|
|
"""Self-contained read-only payload for the projector page. No
|
|
student_ids; only aggregate distributions and the public top-N
|
|
leaderboard. Sent on initial GET + every WS state change."""
|
|
session = await self.get_session(sid)
|
|
pool = await self.get_pool_for_session(sid)
|
|
state = session["state"]
|
|
current_idx = session["current_question_idx"]
|
|
title = session["title"]
|
|
join_url = f"{self.settings.public_url}/?sid={sid}"
|
|
qr_url = _qr_data_url(join_url)
|
|
async with connect(self.settings.db_path) as db:
|
|
part_cur = await db.execute(
|
|
"SELECT COUNT(*) AS count FROM participants WHERE sid = ?", (sid,)
|
|
)
|
|
participant_count = int((await part_cur.fetchone())["count"])
|
|
question_block: dict[str, Any] | None = None
|
|
live_histogram: dict[str, Any] | None = None
|
|
reveal: dict[str, Any] | None = None
|
|
response_time_distribution: dict[str, Any] | None = None
|
|
if current_idx is not None and state in {"question_open", "question_closed"}:
|
|
question = get_question(pool, int(current_idx))
|
|
event = await self.get_question_event(sid, int(current_idx))
|
|
opened_ms = int(parse_ts(event["opened_at"]).timestamp() * 1000)
|
|
time_limit_s = int(event["time_limit"])
|
|
remaining_ms = max(0, opened_ms + time_limit_s * 1000 - now_ms()) if state == "question_open" else 0
|
|
question_block = {
|
|
"idx": int(current_idx),
|
|
"text": question["text"],
|
|
"options": question["options"],
|
|
"opened_at_server_ts": opened_ms,
|
|
"time_limit": time_limit_s,
|
|
"remaining_ms": remaining_ms,
|
|
"total_questions": question_count(pool),
|
|
}
|
|
histogram = await self.histogram(sid, int(current_idx), pending=True)
|
|
submitted = histogram["A"] + histogram["B"] + histogram["C"] + histogram["D"]
|
|
live_histogram = {
|
|
"counts": histogram,
|
|
"submitted_count": submitted,
|
|
"total_count": submitted + histogram["missed"] + histogram.get("pending", 0),
|
|
}
|
|
response_time_distribution = await self._response_time_buckets(sid, int(current_idx), time_limit_s)
|
|
if state == "question_closed":
|
|
reveal = {
|
|
"correct": question["correct"],
|
|
"explanation": question.get("explanation", ""),
|
|
}
|
|
leaderboard = await self.leaderboard(sid, limit=10)
|
|
# Strip student_ids from the public leaderboard. The instructor
|
|
# /admin board still has them via include_ids=True.
|
|
public_leaderboard = [
|
|
{"rank": row["rank"], "name": row["name"], "score": row["score"]}
|
|
for row in leaderboard
|
|
]
|
|
score_distribution = await self._score_distribution(sid, question_count(pool))
|
|
return {
|
|
"type": "projector_state",
|
|
"sid": sid,
|
|
"state": state,
|
|
"title": title,
|
|
"join_url": join_url,
|
|
"qr_url": qr_url,
|
|
"participant_count": participant_count,
|
|
"pool_meta": {
|
|
"question_count": question_count(pool),
|
|
"time_limit_default": pool["time_limit_default"],
|
|
"score_fn": pool["score_fn"],
|
|
},
|
|
"question": question_block,
|
|
"live_histogram": live_histogram,
|
|
"reveal": reveal,
|
|
"response_time_distribution": response_time_distribution,
|
|
"score_distribution": score_distribution,
|
|
"leaderboard": public_leaderboard,
|
|
"server_ts": now_ms(),
|
|
}
|
|
|
|
async def _response_time_buckets(self, sid: str, question_idx: int, time_limit_s: int) -> dict[str, Any]:
|
|
# Bucket elapsed-ms into 8 equal-width bins from 0..time_limit_s.
|
|
# Bins are {"label": "0-7s", "count": N, "is_correct_avg": 0..1}.
|
|
async with connect(self.settings.db_path) as db:
|
|
cur = await db.execute(
|
|
"""
|
|
SELECT s.elapsed_ms, s.answer
|
|
FROM submissions s
|
|
WHERE s.sid = ? AND s.question_idx = ? AND s.status = 'submitted' AND s.elapsed_ms IS NOT NULL
|
|
""",
|
|
(sid, question_idx),
|
|
)
|
|
rows = await cur.fetchall()
|
|
bins = 8
|
|
if time_limit_s <= 0:
|
|
time_limit_s = 60
|
|
edge_ms = (time_limit_s * 1000) / bins
|
|
buckets = [{"label": "", "count": 0} for _ in range(bins)]
|
|
for i in range(bins):
|
|
lo = round(edge_ms * i / 1000)
|
|
hi = round(edge_ms * (i + 1) / 1000)
|
|
buckets[i]["label"] = f"{lo}-{hi}s"
|
|
for row in rows:
|
|
ms = int(row["elapsed_ms"])
|
|
idx = min(bins - 1, max(0, int(ms // edge_ms)))
|
|
buckets[idx]["count"] += 1
|
|
total = sum(b["count"] for b in buckets)
|
|
return {"buckets": buckets, "total": total}
|
|
|
|
async def _score_distribution(self, sid: str, question_count_total: int) -> dict[str, Any]:
|
|
"""Histogram of per-student total scores. Bins are 10% of the
|
|
max-possible total (so every quiz lands on a 10-bucket axis
|
|
regardless of question count)."""
|
|
async with connect(self.settings.db_path) as db:
|
|
cur = await db.execute(
|
|
"""
|
|
SELECT p.student_id, COALESCE(SUM(s.score), 0) AS total
|
|
FROM participants p
|
|
LEFT JOIN submissions s ON s.sid = p.sid AND s.student_id = p.student_id
|
|
WHERE p.sid = ?
|
|
GROUP BY p.student_id
|
|
""",
|
|
(sid,),
|
|
)
|
|
rows = await cur.fetchall()
|
|
max_total = max(1, question_count_total)
|
|
bins = 10
|
|
edge = max_total / bins
|
|
buckets = [{"label": "", "count": 0} for _ in range(bins)]
|
|
for i in range(bins):
|
|
lo = round(edge * i, 1)
|
|
hi = round(edge * (i + 1), 1)
|
|
buckets[i]["label"] = f"{lo}-{hi}"
|
|
for row in rows:
|
|
total = float(row["total"])
|
|
idx = min(bins - 1, max(0, int(total // edge))) if edge > 0 else 0
|
|
buckets[idx]["count"] += 1
|
|
return {"buckets": buckets, "max_total": max_total, "n": len(rows)}
|
|
|
|
async def projector_ws(self, websocket: WebSocket, sid: str) -> None:
|
|
await websocket.accept()
|
|
self.projector_clients[sid].add(websocket)
|
|
try:
|
|
await websocket.send_json(await self.projector_snapshot(sid))
|
|
while True:
|
|
# Projector is read-only; we just keep the socket open and
|
|
# accept ping/keepalive messages so reverse proxies don't
|
|
# idle the connection out.
|
|
try:
|
|
data = await websocket.receive_json()
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if isinstance(data, dict) and data.get("type") == "ping":
|
|
try:
|
|
await websocket.send_json({"type": "pong"})
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
break
|
|
except (WebSocketDisconnect, RuntimeError):
|
|
pass
|
|
finally:
|
|
self.projector_clients[sid].discard(websocket)
|
|
|
|
async def broadcast_projectors(self, sid: str) -> None:
|
|
if not self.projector_clients.get(sid):
|
|
return
|
|
try:
|
|
snapshot = await self.projector_snapshot(sid)
|
|
except Exception:
|
|
return
|
|
for ws in list(self.projector_clients[sid]):
|
|
self._queue_send(ws, snapshot)
|
|
await asyncio.sleep(0)
|
|
|
|
async def live_histogram_message(self, sid: str, question_idx: int) -> dict[str, Any]:
|
|
histogram = await self.histogram(sid, question_idx, pending=True)
|
|
submitted_count = histogram["A"] + histogram["B"] + histogram["C"] + histogram["D"]
|
|
return {
|
|
"type": "live_histogram",
|
|
"question_idx": question_idx,
|
|
"histogram": histogram,
|
|
"submitted_count": submitted_count,
|
|
"total_count": submitted_count + histogram["missed"] + histogram.get("pending", 0),
|
|
}
|
|
|
|
async def full_leaderboard_message(self, sid: str) -> dict[str, Any]:
|
|
return {"type": "full_leaderboard", "leaderboard": await self.leaderboard(sid, include_ids=True)}
|
|
|
|
async def me(self, sid: str, student_id: str) -> dict[str, Any]:
|
|
async with connect(self.settings.db_path) as db:
|
|
part_cursor = await db.execute("SELECT * FROM participants WHERE sid = ? AND student_id = ?", (sid, student_id))
|
|
participant = await part_cursor.fetchone()
|
|
if participant is None:
|
|
# Participant row is gone (typically because the instructor
|
|
# ran a reset). Caller is expected to translate this into a
|
|
# 401 + cookie-clear so the client lands on the join form.
|
|
raise KeyError(f"No participant {student_id!r} in session {sid!r}")
|
|
sub_cursor = await db.execute(
|
|
"SELECT question_idx, answer, elapsed_ms, score, status FROM submissions WHERE sid = ? AND student_id = ? ORDER BY question_idx",
|
|
(sid, student_id),
|
|
)
|
|
submissions = await sub_cursor.fetchall()
|
|
return {
|
|
"student_id": participant["student_id"],
|
|
"name": participant["name"],
|
|
"total_score": await self.total_for(sid, student_id),
|
|
"submissions": [dict(row) for row in submissions],
|
|
}
|
|
|
|
async def stats(self, sid: str, question_idx: int | None, student_id: str | None = None) -> dict[str, Any]:
|
|
session = await self.get_session(sid)
|
|
qidx = question_idx if question_idx is not None else session["current_question_idx"]
|
|
if qidx is None:
|
|
return {
|
|
"question_idx": None,
|
|
"response_time_avg_ms": None,
|
|
"response_time_distribution": {},
|
|
"average_score": 0,
|
|
"top5": await self.leaderboard(sid, limit=5, you_student_id=student_id),
|
|
"your_rank": None,
|
|
}
|
|
async with connect(self.settings.db_path) as db:
|
|
cursor = await db.execute(
|
|
"""
|
|
SELECT elapsed_ms, score
|
|
FROM submissions
|
|
WHERE sid = ? AND question_idx = ? AND status = 'submitted'
|
|
""",
|
|
(sid, qidx),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
times = [row["elapsed_ms"] for row in rows if row["elapsed_ms"] is not None]
|
|
scores = [row["score"] for row in rows]
|
|
distribution = {"0_10s": 0, "10_30s": 0, "30s_plus": 0}
|
|
for elapsed in times:
|
|
if elapsed < 10_000:
|
|
distribution["0_10s"] += 1
|
|
elif elapsed < 30_000:
|
|
distribution["10_30s"] += 1
|
|
else:
|
|
distribution["30s_plus"] += 1
|
|
payload = {
|
|
"question_idx": qidx,
|
|
"response_time_avg_ms": round(sum(times) / len(times)) if times else None,
|
|
"response_time_distribution": distribution,
|
|
"average_score": round(sum(scores) / len(scores), 2) if scores else 0,
|
|
"top5": await self.leaderboard(sid, limit=5, you_student_id=student_id),
|
|
}
|
|
if student_id:
|
|
payload["your_rank"] = await self.rank_for(sid, student_id)
|
|
return payload
|
|
|
|
async def broadcast_lobby(self, sid: str) -> None:
|
|
await self.broadcast_instructors(sid, await self.lobby_message(sid))
|
|
|
|
async def broadcast_question_closed(self, sid: str, question_idx: int) -> None:
|
|
for websocket, identity in list(self.student_clients[sid].items()):
|
|
self._queue_send(websocket, await self.question_closed_message(sid, question_idx, identity))
|
|
await self.broadcast_instructors(sid, await self.question_closed_message(sid, question_idx))
|
|
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
|
|
await self.broadcast_projectors(sid)
|
|
await asyncio.sleep(0)
|
|
|
|
async def broadcast_between_questions(self, sid: str, next_idx: int) -> None:
|
|
for websocket, identity in list(self.student_clients[sid].items()):
|
|
self._queue_send(websocket, await self.between_message(sid, next_idx, identity))
|
|
await self.broadcast_instructors(sid, await self.between_message(sid, next_idx))
|
|
await self.broadcast_instructors(sid, await self.full_leaderboard_message(sid))
|
|
await self.broadcast_projectors(sid)
|
|
await asyncio.sleep(0)
|
|
|
|
async def broadcast_session_ended(self, sid: str) -> None:
|
|
for websocket, identity in list(self.student_clients[sid].items()):
|
|
self._queue_send(websocket, await self.ended_message(sid, identity))
|
|
await self.broadcast_instructors(sid, await self.ended_message(sid))
|
|
await self.broadcast_projectors(sid)
|
|
await asyncio.sleep(0)
|
|
|
|
async def broadcast_students(self, sid: str, message: dict[str, Any]) -> None:
|
|
for websocket in list(self.student_clients[sid]):
|
|
self._queue_send(websocket, message)
|
|
await asyncio.sleep(0)
|
|
|
|
async def broadcast_instructors(self, sid: str, message: dict[str, Any]) -> None:
|
|
for websocket in list(self.instructor_clients[sid]):
|
|
self._queue_send(websocket, message)
|
|
await asyncio.sleep(0)
|
|
|
|
def _queue_send(self, websocket: WebSocket, message: dict[str, Any]) -> None:
|
|
asyncio.create_task(self._safe_send(websocket, message))
|
|
|
|
async def _safe_send(self, websocket: WebSocket, message: dict[str, Any]) -> None:
|
|
try:
|
|
await websocket.send_text(json.dumps(message))
|
|
except Exception:
|
|
pass
|