Source code for animedex.models.aggregate

"""Shared result envelope for multi-source aggregate commands.

Aggregate commands such as ``animedex season`` and ``animedex
schedule`` fan out to multiple upstream backends and may receive a
mix of successful rows and per-source failures. This module provides
the stable envelope shape those commands return: ``items`` contains
only rows from successful sources, while ``sources`` records one
status row per selected backend.
"""

from __future__ import annotations

from datetime import date, datetime, timezone
from typing import Any, Dict, List, Optional

from pydantic import Field

from animedex.models.anime import Anime, AnimeTitle
from animedex.models.common import AnimedexModel, SourceTag


[docs] class AggregateSourceStatus(AnimedexModel): """Status row for one backend inside an aggregate response. :ivar backend: Backend identifier, e.g. ``"anilist"``. :vartype backend: str :ivar status: ``"ok"`` for a successful source, ``"failed"`` for a source that raised while the fan-out continued. :vartype status: str :ivar items: Number of successful rows contributed by this source. :vartype items: int :ivar reason: Stable error reason when the source failed. :vartype reason: str or None :ivar message: Human-readable source failure message. :vartype message: str or None :ivar http_status: HTTP status code when the failure exposed one. :vartype http_status: int or None :ivar duration_ms: Wall-clock time spent in this source call. :vartype duration_ms: float """ backend: str status: str items: int = 0 reason: Optional[str] = None message: Optional[str] = None http_status: Optional[int] = None duration_ms: float = 0.0 @property def ok(self) -> bool: """Return whether this source succeeded. :return: ``True`` when :attr:`status` is ``"ok"``. :rtype: bool """ return self.status == "ok"
[docs] class AggregateResult(AnimedexModel): """Envelope returned by multi-source aggregate commands. The ``items`` list preserves each backend's rich model. Failures are deliberately not injected into ``items``; they live only in ``sources`` so a caller iterating over successful records never has to special-case failure sentinels. :ivar items: Successful rows from every healthy source. :vartype items: list :ivar sources: Per-backend status map. :vartype sources: dict[str, AggregateSourceStatus] :ivar merge_diagnostics: Per-row diagnostics for rows that could not enter merge analysis. :vartype merge_diagnostics: list of dict """ items: List[Any] = Field(default_factory=list) sources: Dict[str, AggregateSourceStatus] = Field(default_factory=dict) merge_diagnostics: List[Dict[str, Any]] = Field(default_factory=list) @property def failed_sources(self) -> Dict[str, AggregateSourceStatus]: """Return the failed source statuses. :return: Mapping containing only failed sources. :rtype: dict[str, AggregateSourceStatus] """ return {name: status for name, status in self.sources.items() if not status.ok} @property def succeeded_count(self) -> int: """Return how many selected sources succeeded. :return: Number of ``status == "ok"`` entries. :rtype: int """ return sum(1 for status in self.sources.values() if status.ok) @property def all_failed(self) -> bool: """Return whether every selected source failed. :return: ``True`` when at least one source was selected and none succeeded. :rtype: bool """ return bool(self.sources) and self.succeeded_count == 0
[docs] class ScheduleCalendarResult(AggregateResult): """Aggregate schedule envelope with display-time metadata. The JSON renderer emits this as a normal structured aggregate result. The TTY renderer uses the timezone and date window fields to group schedule rows into a calendar-like view. :ivar timezone: IANA timezone name, ``"UTC"``, ``"local"``, or a fixed-offset value such as ``"+08:00"``. :vartype timezone: str :ivar window_start: Inclusive local date for the schedule window. :vartype window_start: datetime.date :ivar window_end: Exclusive local date for the schedule window. :vartype window_end: datetime.date """ timezone: str window_start: date window_end: date
[docs] class MergedAnime(AnimedexModel): """One anime entry merged across aggregate season sources. :ivar title: Canonical display title chosen from the contributing records. :vartype title: AnimeTitle :ivar ids: Combined external id map. :vartype ids: dict[str, str] :ivar sources: Provenance tags for every contributing backend. :vartype sources: list[SourceTag] :ivar records: Per-backend common anime projections. :vartype records: dict[str, Anime] :ivar core: Compact merged summary. The JSON output keeps this next to the full per-backend records so consumers can read the resolved item without recomputing it. :vartype core: dict :ivar source_details: Per-backend source-specific fields promoted from the contributing records. :vartype source_details: dict[str, dict] :ivar source_payloads: Full per-backend payloads for JSON consumers that need the complete upstream row shape. :vartype source_payloads: dict[str, dict] :ivar id_conflicts: External id conflicts found while building the merged id map. :vartype id_conflicts: list of dict """ title: AnimeTitle ids: Dict[str, str] = Field(default_factory=dict) sources: List[SourceTag] = Field(default_factory=list) records: Dict[str, Anime] = Field(default_factory=dict) core: Dict[str, Any] = Field(default_factory=dict) source_details: Dict[str, Dict[str, Any]] = Field(default_factory=dict) source_payloads: Dict[str, Dict[str, Any]] = Field(default_factory=dict) id_conflicts: List[Dict[str, Any]] = Field(default_factory=list)
[docs] def selftest() -> bool: """Smoke-test the aggregate envelope model. The diagnostic runner invokes this to confirm that nested rich models can be carried through the aggregate JSON path and that the source-status helpers behave correctly. :return: ``True`` on success. :rtype: bool """ src = SourceTag(backend="_selftest", fetched_at=datetime.now(timezone.utc)) a = Anime(id="_selftest:1", title=AnimeTitle(romaji="x"), ids={"_selftest": "1"}, source=src) merged = MergedAnime( title=AnimeTitle(romaji="x"), ids={"_selftest": "1"}, sources=[src], records={"_selftest": a}, core={"title": {"romaji": "x"}, "sources": ["_selftest"]}, source_details={"_selftest": {"score": "1.0/10.0"}}, source_payloads={"_selftest": {"id": "_selftest:1"}}, id_conflicts=[ { "key": "_selftest", "kept_value": "1", "conflicting_value": "2", "backend": "_selftest", "source": "record.id", } ], ) calendar = ScheduleCalendarResult( items=[src], sources={"ok": AggregateSourceStatus(backend="ok", status="ok", items=1)}, timezone="UTC", window_start=datetime.now(timezone.utc).date(), window_end=datetime.now(timezone.utc).date(), ) result = AggregateResult( items=[src], sources={ "ok": AggregateSourceStatus(backend="ok", status="ok", items=1), "failed": AggregateSourceStatus( backend="failed", status="failed", reason="upstream-error", message="failed", http_status=500, ), }, merge_diagnostics=[ { "backend": "_selftest", "id": "_selftest:broken", "reason": "to-common-failed", "message": "ValueError: broken", } ], ) decoded = result.model_dump(mode="json") assert decoded["items"][0]["backend"] == "_selftest" assert decoded["merge_diagnostics"][0]["reason"] == "to-common-failed" assert merged.records["_selftest"].id == "_selftest:1" assert merged.core["sources"] == ["_selftest"] assert merged.source_details["_selftest"]["score"] == "1.0/10.0" assert merged.source_payloads["_selftest"]["id"] == "_selftest:1" assert merged.id_conflicts[0]["source"] == "record.id" assert calendar.timezone == "UTC" assert result.succeeded_count == 1 assert list(result.failed_sources) == ["failed"] assert not result.all_failed return True