Files
math/app/main.py
Dmitry Bikulov 93883a4470 Fix math
2025-09-15 19:38:36 +03:00

279 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Dict, Optional
import random
import time
import uuid
import os
# -----------------
# Models / Schemas
# -----------------
class Question(BaseModel):
index: int # 1-based question number
a: int
b: int
options: List[int]
class StartResponse(BaseModel):
session_id: str
total_questions: int
duration_seconds: int
remaining_seconds: int
question: Question
class AnswerRequest(BaseModel):
answer: int
class Score(BaseModel):
correct: int
total_answered: int
total: int
class AnswerResponse(BaseModel):
correct: bool
finished: bool
reason: Optional[str] = None # "completed" | "timeout"
remaining_seconds: int
score: Score
question: Optional[Question] = None
# -----------------
# Session management
# -----------------
TOTAL_QUESTIONS = 20
DURATION_SECONDS = 300
class Session:
def __init__(self, total_questions: int = TOTAL_QUESTIONS, duration: int = DURATION_SECONDS):
self.id = str(uuid.uuid4())
self.total_questions = total_questions
self.duration = duration
self.start_time = time.time()
self.questions = self._generate_questions(total_questions)
self.current_index = 0 # 0-based
self.correct_count = 0
self.answered = 0
self.finished = False
self.reason: Optional[str] = None
def remaining_seconds(self) -> int:
remaining = int(self.duration - (time.time() - self.start_time))
return max(0, remaining)
def time_over(self) -> bool:
return self.remaining_seconds() <= 0
@staticmethod
def _generate_questions(n: int) -> List[Dict]:
# Build unique unordered pairs (avoid mirrored duplicates like 6×8 and 8×6)
all_pairs = [(a, b) for a in range(2, 6) for b in range(a, 6)] # a <= b
count = min(n, len(all_pairs))
picked = random.sample(all_pairs, count)
questions: List[Dict] = []
for a0, b0 in picked:
# Randomize order for variety while keeping uniqueness by unordered pair
if random.random() < 0.5:
a, b = a0, b0
else:
a, b = b0, a0
correct = a * b
options = Session._generate_options(a, b, correct)
questions.append({
"a": a,
"b": b,
"correct": correct,
"options": options,
})
return questions
@staticmethod
def _generate_options(a: int, b: int, correct: int) -> List[int]:
# Generate plausible distractors based on nearby multiplication results
candidates = set()
# Nearby products
for da, db in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
na = a + da
nb = b + db
if 1 <= na <= 10 and 1 <= nb <= 10:
candidates.add(na * nb)
# Small offsets
for off in [-3, -2, -1, 1, 2, 3, 4]:
if correct + off > 0:
candidates.add(correct + off)
# Random other table values
while len(candidates) < 20:
candidates.add(random.randint(2, 10) * random.randint(2, 10))
# Remove the correct answer and pick three unique distractors
candidates.discard(correct)
distractors = random.sample(list(candidates), 3)
opts = distractors + [correct]
random.shuffle(opts)
return opts
def current_question(self) -> Optional[Question]:
if self.current_index >= self.total_questions:
return None
q = self.questions[self.current_index]
return Question(index=self.current_index + 1, a=q["a"], b=q["b"], options=q["options"])
def submit_answer(self, answer: int) -> bool:
q = self.questions[self.current_index]
is_correct = (answer == q["correct"])
if is_correct:
self.correct_count += 1
self.answered += 1
self.current_index += 1
if self.current_index >= self.total_questions:
self.finished = True
self.reason = "completed"
return is_correct
SESSIONS: Dict[str, Session] = {}
# -------------
# FastAPI setup
# -------------
app = FastAPI(title="Таблица умножения — тренажёр")
static_dir = os.path.join(os.path.dirname(__file__), "..", "static")
static_dir = os.path.abspath(static_dir)
if not os.path.isdir(static_dir):
os.makedirs(static_dir, exist_ok=True)
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.get("/")
def index():
index_path = os.path.join(static_dir, "index.html")
if not os.path.exists(index_path):
raise HTTPException(status_code=404, detail="index.html not found")
return FileResponse(index_path)
@app.post("/api/session/start", response_model=StartResponse)
def start_session():
session = Session()
SESSIONS[session.id] = session
if session.time_over():
session.finished = True
session.reason = "timeout"
raise HTTPException(status_code=400, detail="Session immediately timed out")
q = session.current_question()
return StartResponse(
session_id=session.id,
total_questions=session.total_questions,
duration_seconds=session.duration,
remaining_seconds=session.remaining_seconds(),
question=q,
)
def _get_session(session_id: str) -> Session:
session = SESSIONS.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session
@app.post("/api/session/{session_id}/answer", response_model=AnswerResponse)
def submit_answer(session_id: str, req: AnswerRequest):
session = _get_session(session_id)
if session.finished:
return AnswerResponse(
correct=False,
finished=True,
reason=session.reason or "completed",
remaining_seconds=session.remaining_seconds(),
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=None,
)
if session.time_over():
session.finished = True
session.reason = "timeout"
return AnswerResponse(
correct=False,
finished=True,
reason="timeout",
remaining_seconds=0,
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=None,
)
# Must have a current question
current_q = session.current_question()
if current_q is None:
session.finished = True
session.reason = "completed"
return AnswerResponse(
correct=False,
finished=True,
reason="completed",
remaining_seconds=session.remaining_seconds(),
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=None,
)
is_correct = session.submit_answer(req.answer)
if session.time_over() and not session.finished:
session.finished = True
session.reason = "timeout"
next_q = session.current_question() if not session.finished else None
return AnswerResponse(
correct=is_correct,
finished=session.finished,
reason=session.reason,
remaining_seconds=session.remaining_seconds(),
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=next_q,
)
class StateResponse(BaseModel):
finished: bool
reason: Optional[str]
remaining_seconds: int
current_index: int
total_questions: int
correct: int
answered: int
@app.get("/api/session/{session_id}/state", response_model=StateResponse)
def get_state(session_id: str):
session = _get_session(session_id)
if session.time_over() and not session.finished:
session.finished = True
session.reason = "timeout"
return StateResponse(
finished=session.finished,
reason=session.reason,
remaining_seconds=session.remaining_seconds(),
current_index=min(session.current_index + 1, session.total_questions),
total_questions=session.total_questions,
correct=session.correct_count,
answered=session.answered,
)