Source code for animedex.agg._fanout

"""Shared concurrent fan-out helper for aggregate commands.

Callers provide named source callables. The helper runs them
independently, catches per-source failures, and returns a structured
:class:`~animedex.models.aggregate.AggregateResult` instead of
raising on the first failed backend.
"""

from __future__ import annotations

import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Sequence

from animedex.models.aggregate import AggregateResult, AggregateSourceStatus
from animedex.models.common import ApiError


[docs] @dataclass(frozen=True) class FanoutSource: """One source participating in aggregate fan-out. :ivar name: Backend identifier. :vartype name: str :ivar call: Zero-argument callable that returns this source's rows. :vartype call: callable """ name: str call: Callable[[], object]
_HTTP_STATUS_RE = re.compile( r"\b(?:" r"HTTP(?:[/ ]?[0-9.]+)?\s+" r"|status(?:\s+code)?\s*[:=]?\s*" r"|returned\s+" r"|response\s+" r"|AniList\s+|Jikan\s+|Kitsu\s+|MangaDex\s+|Shikimori\s+|Danbooru\s+|ANN\s+|Trace\.moe\s+" r")" r"([1-5][0-9]{2})\b", re.IGNORECASE, ) def _duration_ms(t_start: float) -> float: return round((time.monotonic() - t_start) * 1000.0, 3) def _normalise_items(value: object) -> List[object]: """Return a list of successful rows from a source return value.""" if value is None: return [] if isinstance(value, list): return value if isinstance(value, tuple): return list(value) if isinstance(value, dict): for key in ("items", "data"): rows = value.get(key) if isinstance(rows, list): return rows if isinstance(rows, tuple): return list(rows) raise ApiError( "aggregate source returned a dict without list-shaped items or data", backend="aggregate", reason="upstream-shape", ) rows = getattr(value, "rows", None) if isinstance(rows, list): return rows raise ApiError( f"aggregate source returned unsupported shape: {type(value).__name__}", backend="aggregate", reason="upstream-shape", ) def _http_status_from_message(message: str) -> Optional[int]: match = _HTTP_STATUS_RE.search(message) if match is None: return None return int(match.group(1)) def _status_from_exception(name: str, exc: BaseException, duration_ms: float) -> AggregateSourceStatus: reason = "upstream-error" backend = name if isinstance(exc, ApiError): reason = exc.reason or reason backend = exc.backend or backend message = exc.message else: message = f"{type(exc).__name__}: {exc}" return AggregateSourceStatus( backend=backend, status="failed", items=0, reason=reason, message=message, http_status=_http_status_from_message(str(exc)), duration_ms=duration_ms, ) def _run_one(source: FanoutSource): t_start = time.monotonic() try: items = _normalise_items(source.call()) except Exception as exc: return source.name, [], _status_from_exception(source.name, exc, _duration_ms(t_start)) return ( source.name, items, AggregateSourceStatus( backend=source.name, status="ok", items=len(items), duration_ms=_duration_ms(t_start), ), )
[docs] def run_fanout(sources: Sequence[FanoutSource], *, max_workers: Optional[int] = None) -> AggregateResult: """Run source calls and return one aggregate envelope. :param sources: Source call descriptors to run. :type sources: sequence of FanoutSource :param max_workers: Optional thread-pool size. ``None`` means one worker per source. :type max_workers: int or None :return: Aggregate result with successful rows and per-source statuses. :rtype: AggregateResult """ if not sources: return AggregateResult(items=[], sources={}) workers = max_workers if max_workers is not None else len(sources) workers = max(1, min(workers, len(sources))) items_by_source: Dict[str, List[object]] = {} statuses: Dict[str, AggregateSourceStatus] = {} with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(_run_one, source): source.name for source in sources} for future in as_completed(futures): name, source_items, status = future.result() statuses[name] = status items_by_source[name] = source_items items: List[object] = [] for source in sources: items.extend(items_by_source.get(source.name, [])) ordered_statuses = {source.name: statuses[source.name] for source in sources} return AggregateResult(items=items, sources=ordered_statuses)
[docs] def selftest() -> bool: """Smoke-test success, empty, and failed fan-out paths. :return: ``True`` on success. :rtype: bool """ def _ok(): return [1, 2] def _empty(): return [] def _fail(): raise ApiError("upstream returned 500", backend="bad", reason="upstream-error") result = run_fanout( [ FanoutSource("ok", _ok), FanoutSource("empty", _empty), FanoutSource("bad", _fail), ], max_workers=1, ) assert result.sources["ok"].items == 2 assert result.sources["empty"].status == "ok" assert result.sources["bad"].http_status == 500 assert len(result.items) == 2 return True