Initial commit

This commit is contained in:
Dmitry Bikulov
2025-09-11 13:42:53 +03:00
commit d1bb3559bb
9 changed files with 640 additions and 0 deletions

278
app/main.py Normal file
View File

@@ -0,0 +1,278 @@
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Dict, Optional
import random
import time
import uuid
import os
# -----------------
# Models / Schemas
# -----------------
class Question(BaseModel):
index: int # 1-based question number
a: int
b: int
options: List[int]
class StartResponse(BaseModel):
session_id: str
total_questions: int
duration_seconds: int
remaining_seconds: int
question: Question
class AnswerRequest(BaseModel):
answer: int
class Score(BaseModel):
correct: int
total_answered: int
total: int
class AnswerResponse(BaseModel):
correct: bool
finished: bool
reason: Optional[str] = None # "completed" | "timeout"
remaining_seconds: int
score: Score
question: Optional[Question] = None
# -----------------
# Session management
# -----------------
TOTAL_QUESTIONS = 20
DURATION_SECONDS = 60
class Session:
def __init__(self, total_questions: int = TOTAL_QUESTIONS, duration: int = DURATION_SECONDS):
self.id = str(uuid.uuid4())
self.total_questions = total_questions
self.duration = duration
self.start_time = time.time()
self.questions = self._generate_questions(total_questions)
self.current_index = 0 # 0-based
self.correct_count = 0
self.answered = 0
self.finished = False
self.reason: Optional[str] = None
def remaining_seconds(self) -> int:
remaining = int(self.duration - (time.time() - self.start_time))
return max(0, remaining)
def time_over(self) -> bool:
return self.remaining_seconds() <= 0
@staticmethod
def _generate_questions(n: int) -> List[Dict]:
# Build unique unordered pairs (avoid mirrored duplicates like 6×8 and 8×6)
all_pairs = [(a, b) for a in range(2, 10) for b in range(a, 10)] # a <= b
count = min(n, len(all_pairs))
picked = random.sample(all_pairs, count)
questions: List[Dict] = []
for a0, b0 in picked:
# Randomize order for variety while keeping uniqueness by unordered pair
if random.random() < 0.5:
a, b = a0, b0
else:
a, b = b0, a0
correct = a * b
options = Session._generate_options(a, b, correct)
questions.append({
"a": a,
"b": b,
"correct": correct,
"options": options,
})
return questions
@staticmethod
def _generate_options(a: int, b: int, correct: int) -> List[int]:
# Generate plausible distractors based on nearby multiplication results
candidates = set()
# Nearby products
for da, db in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
na = a + da
nb = b + db
if 1 <= na <= 10 and 1 <= nb <= 10:
candidates.add(na * nb)
# Small offsets
for off in [-3, -2, -1, 1, 2, 3, 4]:
if correct + off > 0:
candidates.add(correct + off)
# Random other table values
while len(candidates) < 20:
candidates.add(random.randint(2, 10) * random.randint(2, 10))
# Remove the correct answer and pick three unique distractors
candidates.discard(correct)
distractors = random.sample(list(candidates), 3)
opts = distractors + [correct]
random.shuffle(opts)
return opts
def current_question(self) -> Optional[Question]:
if self.current_index >= self.total_questions:
return None
q = self.questions[self.current_index]
return Question(index=self.current_index + 1, a=q["a"], b=q["b"], options=q["options"])
def submit_answer(self, answer: int) -> bool:
q = self.questions[self.current_index]
is_correct = (answer == q["correct"])
if is_correct:
self.correct_count += 1
self.answered += 1
self.current_index += 1
if self.current_index >= self.total_questions:
self.finished = True
self.reason = "completed"
return is_correct
SESSIONS: Dict[str, Session] = {}
# -------------
# FastAPI setup
# -------------
app = FastAPI(title="Таблица умножения — тренажёр")
static_dir = os.path.join(os.path.dirname(__file__), "..", "static")
static_dir = os.path.abspath(static_dir)
if not os.path.isdir(static_dir):
os.makedirs(static_dir, exist_ok=True)
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.get("/")
def index():
index_path = os.path.join(static_dir, "index.html")
if not os.path.exists(index_path):
raise HTTPException(status_code=404, detail="index.html not found")
return FileResponse(index_path)
@app.post("/api/session/start", response_model=StartResponse)
def start_session():
session = Session()
SESSIONS[session.id] = session
if session.time_over():
session.finished = True
session.reason = "timeout"
raise HTTPException(status_code=400, detail="Session immediately timed out")
q = session.current_question()
return StartResponse(
session_id=session.id,
total_questions=session.total_questions,
duration_seconds=session.duration,
remaining_seconds=session.remaining_seconds(),
question=q,
)
def _get_session(session_id: str) -> Session:
session = SESSIONS.get(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session
@app.post("/api/session/{session_id}/answer", response_model=AnswerResponse)
def submit_answer(session_id: str, req: AnswerRequest):
session = _get_session(session_id)
if session.finished:
return AnswerResponse(
correct=False,
finished=True,
reason=session.reason or "completed",
remaining_seconds=session.remaining_seconds(),
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=None,
)
if session.time_over():
session.finished = True
session.reason = "timeout"
return AnswerResponse(
correct=False,
finished=True,
reason="timeout",
remaining_seconds=0,
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=None,
)
# Must have a current question
current_q = session.current_question()
if current_q is None:
session.finished = True
session.reason = "completed"
return AnswerResponse(
correct=False,
finished=True,
reason="completed",
remaining_seconds=session.remaining_seconds(),
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=None,
)
is_correct = session.submit_answer(req.answer)
if session.time_over() and not session.finished:
session.finished = True
session.reason = "timeout"
next_q = session.current_question() if not session.finished else None
return AnswerResponse(
correct=is_correct,
finished=session.finished,
reason=session.reason,
remaining_seconds=session.remaining_seconds(),
score=Score(correct=session.correct_count, total_answered=session.answered, total=session.total_questions),
question=next_q,
)
class StateResponse(BaseModel):
finished: bool
reason: Optional[str]
remaining_seconds: int
current_index: int
total_questions: int
correct: int
answered: int
@app.get("/api/session/{session_id}/state", response_model=StateResponse)
def get_state(session_id: str):
session = _get_session(session_id)
if session.time_over() and not session.finished:
session.finished = True
session.reason = "timeout"
return StateResponse(
finished=session.finished,
reason=session.reason,
remaining_seconds=session.remaining_seconds(),
current_index=min(session.current_index + 1, session.total_questions),
total_questions=session.total_questions,
correct=session.correct_count,
answered=session.answered,
)