"""Question pool validation.""" from __future__ import annotations import json from pathlib import Path from typing import Any from app.scoring import SCORE_FNS OPTION_KEYS = {"A", "B", "C", "D"} class PoolValidationError(ValueError): pass def parse_pool_json(pool_json: str | dict[str, Any]) -> dict[str, Any]: if isinstance(pool_json, str): try: data = json.loads(pool_json) except json.JSONDecodeError as exc: raise PoolValidationError(f"Invalid JSON: {exc.msg}") from exc else: data = pool_json return validate_pool(data) def load_pool_from_file(path: str | Path) -> dict[str, Any]: p = Path(path) if not p.exists(): raise PoolValidationError(f"Pool file not found: {p}") try: raw = p.read_text(encoding="utf-8") except OSError as exc: raise PoolValidationError(f"Could not read pool file {p}: {exc}") from exc return parse_pool_json(raw) def validate_pool(data: dict[str, Any]) -> dict[str, Any]: if not isinstance(data, dict): raise PoolValidationError("Pool must be a JSON object") title = data.get("title") if not isinstance(title, str) or not title.strip(): raise PoolValidationError("Pool title is required") score_fn = data.get("score_fn", "linear_decay") if score_fn not in SCORE_FNS: raise PoolValidationError(f"Unknown score function: {score_fn}") time_limit_default = _positive_int(data.get("time_limit_default", 60), "time_limit_default") questions = data.get("questions") if not isinstance(questions, list) or not questions: raise PoolValidationError("Pool must include at least one question") normalized_questions: list[dict[str, Any]] = [] for index, question in enumerate(questions): if not isinstance(question, dict): raise PoolValidationError(f"Question {index} must be an object") normalized_questions.append(_validate_question(question, index, time_limit_default)) out: dict[str, Any] = { "title": title.strip(), "score_fn": score_fn, "time_limit_default": time_limit_default, "questions": normalized_questions, } session_id = data.get("session_id") if session_id is not None: if not isinstance(session_id, str) or not session_id.strip(): raise PoolValidationError("session_id, if present, must be a non-empty string") out["session_id"] = session_id.strip() return out def question_count(pool: dict[str, Any]) -> int: return len(pool["questions"]) def get_question(pool: dict[str, Any], question_idx: int) -> dict[str, Any]: try: return pool["questions"][question_idx] except IndexError as exc: raise PoolValidationError("Question index out of range") from exc def question_time_limit(pool: dict[str, Any], question_idx: int) -> int: question = get_question(pool, question_idx) return int(question.get("time_limit") or pool["time_limit_default"]) def public_question_payload(pool: dict[str, Any], question_idx: int) -> dict[str, Any]: question = get_question(pool, question_idx) return { "question_idx": question_idx, "text": question["text"], "options": question["options"], "time_limit": question_time_limit(pool, question_idx), } def resolve_option_key(question: dict[str, Any], answer: Any) -> str | None: """Map a submitted answer back to its canonical letter (A..D). Accepts either: - a canonical letter (legacy + internal callers) - the option's full text (production wire format — students send what they saw on the button, never a letter, so even if a leaked "answer is B" message arrives via chat the recipient's button is labelled with text only and the correlation is lost). Returns the canonical letter on match, or None when nothing matches. None is the failsafe: callers turn it into a recorded submission with score=0 (locked in via PK), so attempted circumvention by sending a different string just produces a wrong answer. """ if not isinstance(answer, str): return None if answer in OPTION_KEYS: return answer for key in ("A", "B", "C", "D"): if question["options"].get(key) == answer: return key return None # Canonical 1-indexed position used in the CSV export and any downstream # analysis. The pool's option keys are fixed at A..D, so the mapping is # stable across pools and across re-runs of the same pool. CANONICAL_POSITION = {"A": 1, "B": 2, "C": 3, "D": 4} def _validate_question(question: dict[str, Any], index: int, default_limit: int) -> dict[str, Any]: qid = question.get("id") if not isinstance(qid, str) or not qid.strip(): raise PoolValidationError(f"Question {index} id is required") text = question.get("text") if not isinstance(text, str) or not text.strip(): raise PoolValidationError(f"Question {index} text is required") options = question.get("options") if not isinstance(options, dict) or set(options) != OPTION_KEYS: raise PoolValidationError(f"Question {index} options must be exactly A, B, C, D") for key, value in options.items(): if not isinstance(value, str) or not value.strip(): raise PoolValidationError(f"Question {index} option {key} is required") correct = question.get("correct") if correct not in OPTION_KEYS: raise PoolValidationError(f"Question {index} correct must be one of A, B, C, D") normalized = { "id": qid.strip(), "text": text.strip(), "options": {key: options[key].strip() for key in sorted(OPTION_KEYS)}, "correct": correct, } if "time_limit" in question and question["time_limit"] is not None: normalized["time_limit"] = _positive_int(question["time_limit"], f"Question {index} time_limit") else: normalized["time_limit"] = default_limit explanation = question.get("explanation") if explanation is not None: if not isinstance(explanation, str): raise PoolValidationError(f"Question {index} explanation must be text") normalized["explanation"] = explanation.strip() return normalized def _positive_int(value: Any, label: str) -> int: if isinstance(value, bool) or not isinstance(value, int) or value <= 0: raise PoolValidationError(f"{label} must be a positive integer") return value