"""Shared result envelope for multi-source aggregate commands.
Aggregate commands such as ``animedex season`` and ``animedex
schedule`` fan out to multiple upstream backends and may receive a
mix of successful rows and per-source failures. This module provides
the stable envelope shape those commands return: ``items`` contains
only rows from successful sources, while ``sources`` records one
status row per selected backend.
"""
from __future__ import annotations
from datetime import date, datetime, timezone
from typing import Any, Dict, List, Optional
from pydantic import Field
from animedex.models.anime import Anime, AnimeTitle
from animedex.models.common import AnimedexModel, SourceTag
[docs]
class AggregateSourceStatus(AnimedexModel):
"""Status row for one backend inside an aggregate response.
:ivar backend: Backend identifier, e.g. ``"anilist"``.
:vartype backend: str
:ivar status: ``"ok"`` for a successful source, ``"failed"`` for
a source that raised while the fan-out continued.
:vartype status: str
:ivar items: Number of successful rows contributed by this source.
:vartype items: int
:ivar reason: Stable error reason when the source failed.
:vartype reason: str or None
:ivar message: Human-readable source failure message.
:vartype message: str or None
:ivar http_status: HTTP status code when the failure exposed one.
:vartype http_status: int or None
:ivar duration_ms: Wall-clock time spent in this source call.
:vartype duration_ms: float
"""
backend: str
status: str
items: int = 0
reason: Optional[str] = None
message: Optional[str] = None
http_status: Optional[int] = None
duration_ms: float = 0.0
@property
def ok(self) -> bool:
"""Return whether this source succeeded.
:return: ``True`` when :attr:`status` is ``"ok"``.
:rtype: bool
"""
return self.status == "ok"
[docs]
class AggregateResult(AnimedexModel):
"""Envelope returned by multi-source aggregate commands.
The ``items`` list preserves each backend's rich model. Failures
are deliberately not injected into ``items``; they live only in
``sources`` so a caller iterating over successful records never
has to special-case failure sentinels.
:ivar items: Successful rows from every healthy source.
:vartype items: list
:ivar sources: Per-backend status map.
:vartype sources: dict[str, AggregateSourceStatus]
:ivar merge_diagnostics: Per-row diagnostics for rows that could
not enter merge analysis.
:vartype merge_diagnostics: list of dict
"""
items: List[Any] = Field(default_factory=list)
sources: Dict[str, AggregateSourceStatus] = Field(default_factory=dict)
merge_diagnostics: List[Dict[str, Any]] = Field(default_factory=list)
@property
def failed_sources(self) -> Dict[str, AggregateSourceStatus]:
"""Return the failed source statuses.
:return: Mapping containing only failed sources.
:rtype: dict[str, AggregateSourceStatus]
"""
return {name: status for name, status in self.sources.items() if not status.ok}
@property
def succeeded_count(self) -> int:
"""Return how many selected sources succeeded.
:return: Number of ``status == "ok"`` entries.
:rtype: int
"""
return sum(1 for status in self.sources.values() if status.ok)
@property
def all_failed(self) -> bool:
"""Return whether every selected source failed.
:return: ``True`` when at least one source was selected and
none succeeded.
:rtype: bool
"""
return bool(self.sources) and self.succeeded_count == 0
[docs]
class ScheduleCalendarResult(AggregateResult):
"""Aggregate schedule envelope with display-time metadata.
The JSON renderer emits this as a normal structured aggregate
result. The TTY renderer uses the timezone and date window fields
to group schedule rows into a calendar-like view.
:ivar timezone: IANA timezone name, ``"UTC"``, ``"local"``, or a
fixed-offset value such as ``"+08:00"``.
:vartype timezone: str
:ivar window_start: Inclusive local date for the schedule window.
:vartype window_start: datetime.date
:ivar window_end: Exclusive local date for the schedule window.
:vartype window_end: datetime.date
"""
timezone: str
window_start: date
window_end: date
[docs]
class MergedAnime(AnimedexModel):
"""One anime entry merged across aggregate season sources.
:ivar title: Canonical display title chosen from the contributing
records.
:vartype title: AnimeTitle
:ivar ids: Combined external id map.
:vartype ids: dict[str, str]
:ivar sources: Provenance tags for every contributing backend.
:vartype sources: list[SourceTag]
:ivar records: Per-backend common anime projections.
:vartype records: dict[str, Anime]
:ivar core: Compact merged summary. The JSON output keeps this
next to the full per-backend records so consumers can
read the resolved item without recomputing it.
:vartype core: dict
:ivar source_details: Per-backend source-specific fields promoted
from the contributing records.
:vartype source_details: dict[str, dict]
:ivar source_payloads: Full per-backend payloads for JSON
consumers that need the complete upstream
row shape.
:vartype source_payloads: dict[str, dict]
:ivar id_conflicts: External id conflicts found while building the
merged id map.
:vartype id_conflicts: list of dict
"""
title: AnimeTitle
ids: Dict[str, str] = Field(default_factory=dict)
sources: List[SourceTag] = Field(default_factory=list)
records: Dict[str, Anime] = Field(default_factory=dict)
core: Dict[str, Any] = Field(default_factory=dict)
source_details: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
source_payloads: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
id_conflicts: List[Dict[str, Any]] = Field(default_factory=list)
[docs]
def selftest() -> bool:
"""Smoke-test the aggregate envelope model.
The diagnostic runner invokes this to confirm that nested rich
models can be carried through the aggregate JSON path and that the
source-status helpers behave correctly.
:return: ``True`` on success.
:rtype: bool
"""
src = SourceTag(backend="_selftest", fetched_at=datetime.now(timezone.utc))
a = Anime(id="_selftest:1", title=AnimeTitle(romaji="x"), ids={"_selftest": "1"}, source=src)
merged = MergedAnime(
title=AnimeTitle(romaji="x"),
ids={"_selftest": "1"},
sources=[src],
records={"_selftest": a},
core={"title": {"romaji": "x"}, "sources": ["_selftest"]},
source_details={"_selftest": {"score": "1.0/10.0"}},
source_payloads={"_selftest": {"id": "_selftest:1"}},
id_conflicts=[
{
"key": "_selftest",
"kept_value": "1",
"conflicting_value": "2",
"backend": "_selftest",
"source": "record.id",
}
],
)
calendar = ScheduleCalendarResult(
items=[src],
sources={"ok": AggregateSourceStatus(backend="ok", status="ok", items=1)},
timezone="UTC",
window_start=datetime.now(timezone.utc).date(),
window_end=datetime.now(timezone.utc).date(),
)
result = AggregateResult(
items=[src],
sources={
"ok": AggregateSourceStatus(backend="ok", status="ok", items=1),
"failed": AggregateSourceStatus(
backend="failed",
status="failed",
reason="upstream-error",
message="failed",
http_status=500,
),
},
merge_diagnostics=[
{
"backend": "_selftest",
"id": "_selftest:broken",
"reason": "to-common-failed",
"message": "ValueError: broken",
}
],
)
decoded = result.model_dump(mode="json")
assert decoded["items"][0]["backend"] == "_selftest"
assert decoded["merge_diagnostics"][0]["reason"] == "to-common-failed"
assert merged.records["_selftest"].id == "_selftest:1"
assert merged.core["sources"] == ["_selftest"]
assert merged.source_details["_selftest"]["score"] == "1.0/10.0"
assert merged.source_payloads["_selftest"]["id"] == "_selftest:1"
assert merged.id_conflicts[0]["source"] == "record.id"
assert calendar.timezone == "UTC"
assert result.succeeded_count == 1
assert list(result.failed_sources) == ["failed"]
assert not result.all_failed
return True