api/app/routes/stats.py
2026-03-31 11:43:01 +02:00

152 lines
4.5 KiB
Python

import sqlite3
from fastapi import APIRouter, Depends, Query
from app.database import get_db
from app.models import CreatorStats, DecadeStats, GenreStats, OverviewStats, YearActivity
router = APIRouter(prefix="/stats", tags=["Statistics"])
@router.get(
"",
response_model=OverviewStats,
summary="Overall statistics",
description="High-level numbers: total reviews, breakdown by category, average rating, and date range of the reviews collection.",
)
def overview(db: sqlite3.Connection = Depends(get_db)):
total = db.execute("SELECT COUNT(*) FROM reviews").fetchone()[0]
categories = db.execute(
"SELECT category, COUNT(*) as count FROM reviews GROUP BY category ORDER BY count DESC"
).fetchall()
avg = db.execute("SELECT ROUND(AVG(rating), 2) FROM reviews").fetchone()[0]
earliest = db.execute("SELECT MIN(date) FROM reviews").fetchone()[0]
latest = db.execute("SELECT MAX(date) FROM reviews").fetchone()[0]
return OverviewStats(
total_reviews=total,
categories=[dict(row) for row in categories],
average_rating=avg,
earliest_review=earliest,
latest_review=latest,
)
@router.get(
"/creators",
response_model=list[CreatorStats],
summary="Creator rankings",
description="Writers, directors and creators ranked by number of reviewed works, with their average rating.",
)
def creator_stats(
category: str | None = Query(None, description="Limit to this category.", examples=["Movies"]),
limit: int = Query(20, description="Number of creators to return.", ge=1, le=100),
db: sqlite3.Connection = Depends(get_db),
):
conditions = []
params = []
if category:
conditions.append("category = ?")
params.append(category)
where = ""
if conditions:
where = "WHERE " + " AND ".join(conditions)
rows = db.execute(
f"SELECT creator, COUNT(*) as review_count, "
f"ROUND(AVG(rating), 2) as average_rating "
f"FROM reviews {where} "
f"GROUP BY creator ORDER BY review_count DESC LIMIT ?",
params + [limit],
).fetchall()
return [dict(row) for row in rows]
@router.get(
"/genres",
response_model=list[GenreStats],
summary="Genre breakdown",
description="Genres with review counts and average ratings. Optionally filtered by media category.",
)
def genre_stats(
category: str | None = Query(None, description="Limit to this category.", examples=["Movies"]),
db: sqlite3.Connection = Depends(get_db),
):
conditions = []
params = []
if category:
conditions.append("category = ?")
params.append(category)
where = ""
if conditions:
where = "WHERE " + " AND ".join(conditions)
rows = db.execute(
f"SELECT genre, COUNT(*) as review_count, "
f"ROUND(AVG(rating), 2) as average_rating "
f"FROM reviews {where} "
f"GROUP BY genre ORDER BY review_count DESC",
params,
).fetchall()
return [dict(row) for row in rows]
@router.get(
"/timeline",
response_model=list[YearActivity],
summary="Review timeline",
description="Reviews per year, showing how my reviewing activity changed over time.",
)
def timeline(db: sqlite3.Connection = Depends(get_db)):
rows = db.execute(
"SELECT SUBSTR(date, 1, 4) as year, COUNT(*) as review_count, "
"ROUND(AVG(rating), 2) as average_rating "
"FROM reviews GROUP BY year ORDER BY year ASC"
).fetchall()
return [dict(row) for row in rows]
@router.get(
"/decades",
response_model=list[DecadeStats],
summary="Reviews by decade of release",
description="Reviews grouped by the decade the work was released. Shows which eras of media appear most in the reviews collection.",
)
def decade_stats(
category: str | None = Query(None, description="Limit to this category.", examples=["Movies"]),
db: sqlite3.Connection = Depends(get_db),
):
conditions = ["year IS NOT NULL"]
params = []
if category:
conditions.append("category = ?")
params.append(category)
where = "WHERE " + " AND ".join(conditions)
rows = db.execute(
f"SELECT (year / 10 * 10) as decade_num, COUNT(*) as review_count, "
f"ROUND(AVG(rating), 2) as average_rating "
f"FROM reviews {where} "
f"GROUP BY decade_num ORDER BY decade_num ASC",
params,
).fetchall()
return [
DecadeStats(
decade=f"{row['decade_num']}s",
review_count=row["review_count"],
average_rating=row["average_rating"],
)
for row in rows
]