from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List, Dict, Optional import random import time import uuid import os # ----------------- # Models / Schemas # ----------------- class Question(BaseModel): index: int # 1-based question number a: int b: int options: List[int] class StartResponse(BaseModel): session_id: str total_questions: int duration_seconds: int remaining_seconds: int question: Question class AnswerRequest(BaseModel): answer: int class Score(BaseModel): correct: int total_answered: int total: int class AnswerResponse(BaseModel): correct: bool finished: bool reason: Optional[str] = None # "completed" | "timeout" remaining_seconds: int score: Score question: Optional[Question] = None # ----------------- # Session management # ----------------- TOTAL_QUESTIONS = 20 DURATION_SECONDS = 300 class Session: def __init__(self, total_questions: int = TOTAL_QUESTIONS, duration: int = DURATION_SECONDS): self.id = str(uuid.uuid4()) self.total_questions = total_questions self.duration = duration self.start_time = time.time() self.questions = self._generate_questions(total_questions) self.current_index = 0 # 0-based self.correct_count = 0 self.answered = 0 self.finished = False self.reason: Optional[str] = None def remaining_seconds(self) -> int: remaining = int(self.duration - (time.time() - self.start_time)) return max(0, remaining) def time_over(self) -> bool: return self.remaining_seconds() <= 0 @staticmethod def _generate_questions(n: int) -> List[Dict]: # Build unique unordered pairs (avoid mirrored duplicates like 6×8 and 8×6) all_pairs = [(a, b) for a in range(2, 10) for b in range(a, 10)] # a <= b count = min(n, len(all_pairs)) picked = random.sample(all_pairs, count) questions: List[Dict] = [] for a0, b0 in picked: # Randomize order for variety while keeping uniqueness by unordered pair if random.random() < 0.5: a, b = a0, b0 else: a, b = b0, a0 correct = a * b options = Session._generate_options(a, b, correct) questions.append({ "a": a, "b": b, "correct": correct, "options": options, }) return questions @staticmethod def _generate_options(a: int, b: int, correct: int) -> List[int]: # Generate plausible distractors based on nearby multiplication results candidates = set() # Nearby products for da, db in [(-1, 0), (1, 0), (0, -1), (0, 1)]: na = a + da nb = b + db if 1 <= na <= 10 and 1 <= nb <= 10: candidates.add(na * nb) # Small offsets for off in [-3, -2, -1, 1, 2, 3, 4]: if correct + off > 0: candidates.add(correct + off) # Random other table values while len(candidates) < 20: candidates.add(random.randint(2, 5) * random.randint(2, 5)) # Remove the correct answer and pick three unique distractors candidates.discard(correct) distractors = random.sample(list(candidates), 3) opts = distractors + [correct] random.shuffle(opts) return opts def current_question(self) -> Optional[Question]: if self.current_index >= self.total_questions: return None q = self.questions[self.current_index] return Question(index=self.current_index + 1, a=q["a"], b=q["b"], options=q["options"]) def submit_answer(self, answer: int) -> bool: q = self.questions[self.current_index] is_correct = (answer == q["correct"]) if is_correct: self.correct_count += 1 self.answered += 1 self.current_index += 1 if self.current_index >= self.total_questions: self.finished = True self.reason = "completed" return is_correct SESSIONS: Dict[str, Session] = {} # ------------- # FastAPI setup # ------------- app = FastAPI(title="Таблица умножения — тренажёр") static_dir = os.path.join(os.path.dirname(__file__), "..", "static") static_dir = os.path.abspath(static_dir) if not os.path.isdir(static_dir): os.makedirs(static_dir, exist_ok=True) app.mount("/static", StaticFiles(directory=static_dir), name="static") @app.get("/") def index(): index_path = os.path.join(static_dir, "index.html") if not os.path.exists(index_path): raise HTTPException(status_code=404, detail="index.html not found") return FileResponse(index_path) @app.post("/api/session/start", response_model=StartResponse) def start_session(): session = Session() SESSIONS[session.id] = session if session.time_over(): session.finished = True session.reason = "timeout" raise HTTPException(status_code=400, detail="Session immediately timed out") q = session.current_question() return StartResponse( session_id=session.id, total_questions=session.total_questions, duration_seconds=session.duration, remaining_seconds=session.remaining_seconds(), question=q, ) def _get_session(session_id: str) -> Session: session = SESSIONS.get(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") return session @app.post("/api/session/{session_id}/answer", response_model=AnswerResponse) def submit_answer(session_id: str, req: AnswerRequest): session = _get_session(session_id) if session.finished: return AnswerResponse( correct=False, finished=True, reason=session.reason or "completed", remaining_seconds=session.remaining_seconds(), score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions), question=None, ) if session.time_over(): session.finished = True session.reason = "timeout" return AnswerResponse( correct=False, finished=True, reason="timeout", remaining_seconds=0, score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions), question=None, ) # Must have a current question current_q = session.current_question() if current_q is None: session.finished = True session.reason = "completed" return AnswerResponse( correct=False, finished=True, reason="completed", remaining_seconds=session.remaining_seconds(), score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions), question=None, ) is_correct = session.submit_answer(req.answer) if session.time_over() and not session.finished: session.finished = True session.reason = "timeout" next_q = session.current_question() if not session.finished else None return AnswerResponse( correct=is_correct, finished=session.finished, reason=session.reason, remaining_seconds=session.remaining_seconds(), score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions), question=next_q, ) class StateResponse(BaseModel): finished: bool reason: Optional[str] remaining_seconds: int current_index: int total_questions: int correct: int answered: int @app.get("/api/session/{session_id}/state", response_model=StateResponse) def get_state(session_id: str): session = _get_session(session_id) if session.time_over() and not session.finished: session.finished = True session.reason = "timeout" return StateResponse( finished=session.finished, reason=session.reason, remaining_seconds=session.remaining_seconds(), current_index=min(session.current_index + 1, session.total_questions), total_questions=session.total_questions, correct=session.correct_count, answered=session.answered, )