"""
Universal ``animedex api`` dispatcher.
:func:`call` is the single entry point: it composes the URL, waits on
the per-backend rate-limit bucket, looks up the local SQLite cache,
issues the HTTP request when needed, and assembles a
:class:`~animedex.api._envelope.RawResponse` envelope containing
everything a CLI render mode needs (body, status, response headers,
redirect chain, request snapshot with credentials redacted, timing
breakdown, cache provenance).
Per-backend modules under ``animedex/api/<backend>.py`` are thin
shims that pre-fill the ``backend`` argument; this module owns the
shared envelope assembly.
"""
from __future__ import annotations
import time
from datetime import datetime # noqa: F401 - re-exported via type hints
from typing import TYPE_CHECKING, Any, Dict, Optional
if TYPE_CHECKING:
from animedex.config import Config
import requests
from animedex.api._envelope import (
RawCacheInfo,
RawRedirectHop,
RawRequest,
RawResponse,
RawTiming,
redact_headers,
)
from animedex.cache.sqlite import SqliteCache, default_ttl_seconds
from animedex.transport.ratelimit import RateLimitRegistry, default_registry
from animedex.transport.read_only import classify_read_only
from animedex.transport.useragent import compose_user_agent
# Per-backend canonical base URL. the substrate API layer default; per-backend
# modules may pass in their own override via the ``base_url`` arg.
_BASE_URLS = {
"anilist": "https://graphql.anilist.co",
"jikan": "https://api.jikan.moe/v4",
"kitsu": "https://kitsu.io/api/edge",
"mangadex": "https://api.mangadex.org",
"trace": "https://api.trace.moe",
"danbooru": "https://danbooru.donmai.us",
"shikimori": "https://shikimori.io",
"ann": "https://cdn.animenewsnetwork.com/encyclopedia",
"nekos": "https://nekos.best/api/v2",
"waifu": "https://api.waifu.im",
"ghibli": "https://ghibliapi.vercel.app",
"quote": "https://api.animechan.io/v1",
}
_BODY_PREVIEW_BYTES = 4096
# Default HTTP timeout when neither the caller nor the per-backend
# shim supplies one. 30 s is conservative for the slowest upstream
# (ANN's encyclopedia XML can take ~10 s on a cold cache); shorter
# values are exposed via the ``timeout_seconds`` kwarg.
_DEFAULT_TIMEOUT_SECONDS = 30.0
[docs]
def resolve_base_url(backend: str) -> str:
"""Return the canonical base URL for ``backend``.
:param backend: Backend identifier.
:type backend: str
:return: Base URL with no trailing slash.
:rtype: str
:raises KeyError: When the backend is not registered.
"""
return _BASE_URLS[backend]
def _join(base_url: str, path: str) -> str:
if path.startswith("http://") or path.startswith("https://"):
return path
if not path.startswith("/"):
path = "/" + path
return base_url.rstrip("/") + path
def _classification_path(path: str) -> str:
from urllib.parse import urlsplit
split = urlsplit(path)
out = split.path or "/"
if not out.startswith("/"):
out = "/" + out
return out
def _canonicalise_params(obj: Any) -> Any:
"""Recursively normalise a query-param structure for cache-key hashing.
Multi-value query parameters (e.g. MangaDex's
``includes[]=cover_art&includes[]=author``) are
semantically set-typed for every backend the project targets, so
``includes[]=author&includes[]=cover_art`` should produce the same
cache key. ``json.dumps(sort_keys=True)`` stabilises dict key
order but leaves list order alone; this helper sorts list values
too so the signature is order-invariant.
Per : applied **only** to ``params`` and **not** to
``json_body``. JSON request bodies often carry ordered semantics
(paginated cursors, mutation order, ordered relations), and
treating them as sets would risk cache poisoning - two distinct
requests sharing a cache key.
:param obj: The params structure (dict / list / scalar).
:return: A structure with every list sorted (using string-keyed
ordering for cross-type stability).
:rtype: Any
"""
import json as _json
if isinstance(obj, dict):
return {k: _canonicalise_params(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
# Sort by the JSON serialisation of each element so mixed-type
# lists (e.g. ``[1, "a"]``) sort deterministically across
# Python versions.
return sorted(
(_canonicalise_params(x) for x in obj),
key=lambda x: _json.dumps(x, sort_keys=True, ensure_ascii=False),
)
return obj
def _signature(method: str, url: str, params: Optional[dict], json_body: Optional[Any], body: Optional[bytes]) -> str:
"""Compose a stable cache key for this request shape.
The hash is order-invariant for query-param lists (see
:func:`_canonicalise_params`); JSON bodies and raw bodies are
hashed verbatim because their order may carry semantics.
"""
import hashlib
import json as _json
h = hashlib.sha256()
h.update(method.upper().encode("utf-8"))
h.update(b"|")
h.update(url.encode("utf-8"))
if params:
h.update(b"|p:")
h.update(_json.dumps(_canonicalise_params(params), sort_keys=True).encode("utf-8"))
if json_body is not None:
h.update(b"|j:")
h.update(_json.dumps(json_body, sort_keys=True).encode("utf-8"))
if body:
h.update(b"|b:")
h.update(body)
return h.hexdigest()
def _make_body_preview(json_body: Optional[Any], raw_body: Optional[bytes]) -> Optional[str]:
if json_body is not None:
import json as _json
text = _json.dumps(json_body, ensure_ascii=False)
elif raw_body is not None:
try:
text = raw_body.decode("utf-8", errors="replace")
except Exception:
return f"<{len(raw_body)} bytes binary>"
else:
return None
if len(text) > _BODY_PREVIEW_BYTES:
return text[:_BODY_PREVIEW_BYTES] + "...truncated"
return text
[docs]
def call(
*,
backend: str,
path: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
params: Optional[dict] = None,
json_body: Optional[Any] = None,
raw_body: Optional[bytes] = None,
follow_redirects: bool = True,
no_cache: bool = False,
cache_ttl: Optional[int] = None,
rate: str = "normal",
timeout_seconds: Optional[float] = None,
user_agent: Optional[str] = None,
base_url: Optional[str] = None,
session: Optional[requests.Session] = None,
cache: Optional[SqliteCache] = None,
rate_limit_registry: Optional[RateLimitRegistry] = None,
config: Optional["Config"] = None,
) -> RawResponse:
"""Issue one request and return its complete envelope.
:param backend: Backend identifier (e.g. ``"anilist"``); determines
base URL and rate-limit bucket.
:type backend: str
:param path: URL path (relative to base) or absolute URL.
:type path: str
:param method: HTTP method.
:type method: str
:param headers: Caller-supplied headers; override defaults.
:type headers: dict or None
:param params: Query parameters.
:type params: dict or None
:param json_body: JSON body for POST.
:type json_body: Any or None
:param raw_body: Raw byte body (mutually exclusive with json_body).
:type raw_body: bytes or None
:param follow_redirects: Whether to follow 3xx automatically.
:type follow_redirects: bool
:param no_cache: When ``True``, skip cache lookup and write.
:type no_cache: bool
:param cache_ttl: Override TTL in seconds; defaults to the metadata
category default.
:type cache_ttl: int or None
:param rate: ``"normal"`` or ``"slow"``.
:type rate: str
:param timeout_seconds: HTTP timeout in seconds. ``None`` falls
back to the project-default 30 s.
:type timeout_seconds: float or None
:param user_agent: Override for the project default UA.
:type user_agent: str or None
:param base_url: Override for the backend's canonical base URL.
:type base_url: str or None
:param session: Reuse a ``requests.Session``; one is created when
not given.
:type session: requests.Session or None
:param cache: SqliteCache instance; ``None`` skips cache regardless
of ``no_cache``.
:type cache: SqliteCache or None
:param rate_limit_registry: Rate-limit registry; defaults to
:func:`animedex.transport.ratelimit.default_registry`.
:type rate_limit_registry: RateLimitRegistry or None
:param config: Optional :class:`~animedex.config.Config` whose
``user_agent``, ``timeout_seconds``, and
``cache_ttl_seconds`` fields supply defaults when
the matching kwarg is not given (or is ``None``).
Explicit kwargs always win. Per
``plans/05 §4`` every public API function honours
this kwarg.
:type config: Config or None
:return: Full envelope.
:rtype: RawResponse
"""
t_start = time.monotonic()
# 0. Resolve config-supplied defaults. Explicit kwargs win; the
# config only fills in fields the caller left at the sentinel
# (``None``). Per / plans/05 §4.
if config is not None:
if user_agent is None:
user_agent = config.user_agent
if timeout_seconds is None:
timeout_seconds = config.timeout_seconds
if cache_ttl is None:
cache_ttl = config.cache_ttl_seconds
# 1. Resolve URL + headers; build the redacted request snapshot.
if base_url is None and backend in _BASE_URLS:
base_url = _BASE_URLS[backend]
method_up = method.upper()
if base_url is not None:
full_url = _join(base_url, path)
else:
full_url = path # only used when path is absolute or backend already unknown
out_headers = {"User-Agent": compose_user_agent(user_agent)}
if headers:
for key, value in headers.items():
if key.lower() == "via":
continue
out_headers[key] = value
request_snapshot = RawRequest(
method=method_up,
url=full_url,
headers=redact_headers(out_headers),
body_preview=_make_body_preview(json_body, raw_body),
)
# 2. Known backend validation. Method/path choices are passed
# through verbatim; callers own the upstream result.
if backend not in _BASE_URLS:
return _firewall_envelope(
backend=backend,
request_snapshot=request_snapshot,
reason="unknown-backend",
message=f"unknown backend: {backend!r}",
t_start=t_start,
)
# 3. Cache lookup. Raw passthrough methods are forwarded
# verbatim, so cache only requests the advisory classifier knows
# are reads. A mutating-looking request must reach the upstream
# every time instead of being replayed from a local row.
sig = _signature(method_up, full_url, params, json_body, raw_body)
cache_allowed = classify_read_only(backend, method_up, _classification_path(path)) is True
cache_lookup_skipped = no_cache or cache is None or not cache_allowed
if not cache_lookup_skipped:
hit = cache.get_with_meta(backend, sig)
if hit is not None:
payload, hdrs, fetched_at = hit
# Defence in depth: the cache-write path already redacts
# response headers (), but legacy rows written
# before that fix landed may still contain raw Set-Cookie
# / Authorization values. Redact at read time too so an
# un-redacted row never escapes into a RawResponse.
return _cache_hit_envelope(
backend=backend,
request_snapshot=request_snapshot,
signature=sig,
payload=payload,
response_headers=redact_headers(hdrs or {}),
fetched_at=fetched_at,
cache_ttl_seconds=cache_ttl if cache_ttl is not None else default_ttl_seconds("metadata"),
t_start=t_start,
)
# 4. Wait on the rate-limit bucket; capture wait time.
registry = rate_limit_registry if rate_limit_registry is not None else default_registry()
bucket = registry.get(backend).with_rate(rate)
t_pre_wait = time.monotonic()
bucket.acquire()
wait_ms = (time.monotonic() - t_pre_wait) * 1000.0
# 5. Issue the HTTP request.
sess = session if session is not None else requests.Session()
t_pre_req = time.monotonic()
response = sess.request(
method_up,
full_url,
headers=out_headers,
params=params,
json=json_body,
data=raw_body,
timeout=timeout_seconds if timeout_seconds is not None else _DEFAULT_TIMEOUT_SECONDS,
allow_redirects=follow_redirects,
)
request_ms = (time.monotonic() - t_pre_req) * 1000.0
# 6. Capture redirect hops from response.history. Redirect
# response headers can carry the same credentials as the final
# hop (Set-Cookie, Authorization), so redact them too.
redirects = []
for hop in response.history:
redirects.append(
RawRedirectHop(
status=hop.status_code,
headers=redact_headers(dict(hop.headers)),
from_url=hop.url,
to_url=hop.headers.get("Location", ""),
elapsed_ms=hop.elapsed.total_seconds() * 1000.0 if hop.elapsed else 0.0,
)
)
# 7. Decode body.
body_bytes = response.content
try:
body_text = body_bytes.decode("utf-8")
except UnicodeDecodeError:
body_text = None
# 8. Redact response headers once, then reuse the redacted dict
# for both the cache write and the live envelope. Cache rows are
# treated as sensitive as the debug output: an attacker reading
# the SQLite file otherwise sees raw Set-Cookie / Authorization.
redacted_response_headers = redact_headers(dict(response.headers))
# 9. Cache write (only on cacheable responses).
if not cache_lookup_skipped and 200 <= response.status_code < 300:
ttl = cache_ttl if cache_ttl is not None else default_ttl_seconds("metadata")
cache.set_with_meta(
backend,
sig,
body_bytes,
response_headers=redacted_response_headers,
ttl_seconds=ttl,
)
total_ms = (time.monotonic() - t_start) * 1000.0
return RawResponse(
backend=backend,
request=request_snapshot,
redirects=redirects,
status=response.status_code,
response_headers=redacted_response_headers,
body_bytes=body_bytes,
body_text=body_text,
timing=RawTiming(total_ms=total_ms, rate_limit_wait_ms=wait_ms, request_ms=request_ms),
cache=RawCacheInfo(hit=False, key=sig if not cache_lookup_skipped else None),
)
[docs]
def selftest_backend_shim(backend: str, call_fn: Any, *, extra_params: tuple = ()) -> bool:
"""Shared offline smoke check for every per-backend shim.
Per-backend ``selftest`` functions should prove that the backend's
own ``call`` was not renamed, removed, or narrowed. This helper
checks that ``call_fn`` is callable, targets a registered raw
backend, has a Pythonic keyword surface, and exposes the
cross-cutting kwargs every shim must thread (``no_cache``,
``cache_ttl``, ``rate``, ``timeout_seconds``, ``user_agent``,
``cache``). A rename that drops one of these is a regression.
:param backend: Backend identifier.
:type backend: str
:param call_fn: The shim's public ``call`` function.
:type call_fn: Callable
:param extra_params: Backend-specific kwargs that must also be
present (e.g. ``("query", "variables")``
for anilist, ``("path",)`` for the REST
shims). Generic kwargs are checked
unconditionally.
:type extra_params: tuple of str
:return: ``True`` on success; raises on failure so the runner
prints a useful traceback.
:rtype: bool
"""
import inspect
resolve_base_url(backend)
sig = inspect.signature(call_fn)
expected = {
"no_cache",
"cache_ttl",
"rate",
"method",
"timeout_seconds",
"user_agent",
"cache",
*extra_params,
}
missing = expected - set(sig.parameters)
assert not missing, f"{call_fn.__module__}.{call_fn.__name__} lost expected params: {sorted(missing)}"
return True
def _firewall_envelope(
*,
backend: str,
request_snapshot: RawRequest,
reason: str,
message: str,
t_start: float,
) -> RawResponse:
total_ms = (time.monotonic() - t_start) * 1000.0
return RawResponse(
backend=backend,
request=request_snapshot,
redirects=[],
status=0,
response_headers={},
body_bytes=b"",
body_text="",
timing=RawTiming(total_ms=total_ms, rate_limit_wait_ms=0.0, request_ms=0.0),
cache=RawCacheInfo(hit=False),
firewall_rejected={"reason": reason, "message": message},
)
def _cache_hit_envelope(
*,
backend: str,
request_snapshot: RawRequest,
signature: str,
payload: bytes,
response_headers: Dict[str, str],
fetched_at: Optional[datetime],
cache_ttl_seconds: int,
t_start: float,
) -> RawResponse:
total_ms = (time.monotonic() - t_start) * 1000.0
try:
body_text = payload.decode("utf-8")
except UnicodeDecodeError:
body_text = None
ttl_remaining = None
if fetched_at is not None:
# Use the cache module's clock so the same fake_clock fixture
# patches it - keeps the dispatcher in step with cache TTL math.
from animedex.cache.sqlite import _utcnow as _cache_now
elapsed = (_cache_now() - fetched_at).total_seconds()
remaining = cache_ttl_seconds - int(elapsed)
if remaining > 0:
ttl_remaining = remaining
return RawResponse(
backend=backend,
request=request_snapshot,
redirects=[],
status=200,
response_headers=response_headers,
body_bytes=payload,
body_text=body_text,
timing=RawTiming(total_ms=total_ms, rate_limit_wait_ms=0.0, request_ms=0.0),
cache=RawCacheInfo(
hit=True,
key=signature,
ttl_remaining_s=ttl_remaining,
fetched_at=fetched_at,
),
)
[docs]
def selftest() -> bool:
"""Smoke-test the dispatcher against an in-memory mock.
:return: ``True`` on success.
:rtype: bool
"""
# Import-only validation: the dispatcher's wiring is exercised
# by unit tests; the selftest just confirms the symbols load
# without touching the network.
from animedex.api._envelope import RawResponse as _RR
assert resolve_base_url("anilist") == "https://graphql.anilist.co"
assert _signature("DELETE", "https://graphql.anilist.co/", None, None, None)
assert _RR.__name__ == "RawResponse"
return True