"""
Per-backend rate limiting for animedex's outgoing HTTP traffic.
This module provides three pieces:
* :class:`TokenBucket` - a small, monotonic-clock-driven token bucket
with both non-blocking (:meth:`TokenBucket.try_acquire`) and
blocking (:meth:`TokenBucket.acquire`) variants. Backends use the
blocking form on the request hot path; :meth:`try_acquire` is for
transparent fast-fail in tests and back-pressure logic.
* :class:`RateLimitRegistry` - a name-keyed map from backend
identifier to its configured bucket. The substrate ships exactly
one default registry (:func:`default_registry`) so the per-backend
caps from ``plans/01`` and the P1 obligations from ``plans/02``
live in one place.
* :func:`default_registry` - the wired-up registry with the caps
every backend documented in ``plans/01-public-apis-anime-survey.md``
honours.
The ``Rate.slow`` override (CLI flag ``--rate slow``) halves the
refill rate; we never expose a faster-than-default mode because the
upstream contract is a P1 ceiling, not a preference.
The module pulls its clock primitives through two indirection points,
:data:`_monotonic` and :data:`_sleep`, so unit tests can substitute a
deterministic fake without monkeypatching the standard library
globally.
"""
from __future__ import annotations
import threading
import time
from typing import Dict
from animedex.models.common import ApiError
_monotonic = time.monotonic
_sleep = time.sleep
[docs]
class TokenBucket:
"""A monotonic-clock token bucket.
The bucket holds at most ``capacity`` tokens and accumulates them
at ``refill_per_second`` per second. Each acquire consumes one
token. The implementation is thread-safe and uses
:data:`_monotonic` / :data:`_sleep` rather than direct ``time``
calls so tests can swap in a fake clock.
:param capacity: Maximum number of tokens the bucket can hold.
Must be positive.
:type capacity: int
:param refill_per_second: Steady-state refill rate, in tokens per
second. Must be positive.
:type refill_per_second: float
:raises ValueError: When ``capacity`` or ``refill_per_second`` is
not strictly positive.
"""
[docs]
def __init__(self, capacity: int, refill_per_second: float) -> None:
if capacity <= 0:
raise ValueError("capacity must be positive")
if refill_per_second <= 0:
raise ValueError("refill_per_second must be positive")
self.capacity = capacity
self.refill_per_second = float(refill_per_second)
self._tokens = float(capacity)
self._last = _monotonic()
self._lock = threading.Lock()
def _refill_locked(self) -> None:
now = _monotonic()
elapsed = now - self._last
if elapsed > 0:
self._tokens = min(self.capacity, self._tokens + elapsed * self.refill_per_second)
self._last = now
[docs]
def try_acquire(self) -> bool:
"""Consume a token without blocking.
:return: ``True`` if a token was available and consumed,
``False`` otherwise.
:rtype: bool
"""
with self._lock:
self._refill_locked()
if self._tokens >= 1.0:
self._tokens -= 1.0
return True
return False
[docs]
def acquire(self) -> None:
"""Consume a token, blocking via :data:`_sleep` until one is
available.
Used by every real-request hot path. Sleep is computed from
the deficit and the refill rate, so we wake exactly when the
next token arrives - there is no polling.
:return: ``None``.
:rtype: None
"""
while True:
with self._lock:
self._refill_locked()
if self._tokens >= 1.0:
self._tokens -= 1.0
return
deficit = 1.0 - self._tokens
wait = deficit / self.refill_per_second
_sleep(wait)
[docs]
def with_rate(self, mode: str) -> "TokenBucket":
"""Return a bucket with the requested rate-mode applied.
``"normal"`` returns ``self`` unchanged. ``"slow"`` returns a
new bucket with the refill rate halved. We never support a
faster mode because that would violate the upstream P1
ceiling.
:param mode: ``"normal"`` or ``"slow"``.
:type mode: str
:return: A bucket honouring the requested mode.
:rtype: TokenBucket
:raises ValueError: When ``mode`` is unrecognised.
"""
if mode == "normal":
return self
if mode == "slow":
return TokenBucket(self.capacity, self.refill_per_second / 2.0)
raise ValueError(f"unknown rate mode: {mode!r}")
[docs]
class RateLimitRegistry:
"""Per-backend bucket map.
A backend identifier (the same short string used in
:class:`~animedex.models.common.SourceTag`) maps to one
:class:`TokenBucket`. :meth:`register` is idempotent across the
same name; :meth:`get` raises :class:`ApiError` when the backend
is unknown so a typo at the call site fails loudly.
:ivar _buckets: Internal mapping from backend identifier to
bucket.
:vartype _buckets: dict
"""
[docs]
def __init__(self) -> None:
self._buckets: Dict[str, TokenBucket] = {}
[docs]
def register(self, name: str, *, capacity: int, refill_per_second: float) -> TokenBucket:
"""Register or replace a backend's bucket.
:param name: Backend identifier.
:type name: str
:param capacity: Bucket capacity.
:type capacity: int
:param refill_per_second: Refill rate.
:type refill_per_second: float
:return: The registered bucket.
:rtype: TokenBucket
"""
bucket = TokenBucket(capacity=capacity, refill_per_second=refill_per_second)
self._buckets[name] = bucket
return bucket
[docs]
def get(self, name: str) -> TokenBucket:
"""Look up a backend's bucket.
:param name: Backend identifier.
:type name: str
:return: The registered bucket.
:rtype: TokenBucket
:raises ApiError: When ``name`` is not registered.
"""
if name not in self._buckets:
raise ApiError(
f"unknown backend: {name!r}",
backend=name,
reason="unknown-backend",
)
return self._buckets[name]
[docs]
def default_registry() -> RateLimitRegistry:
"""Build the project-wide default rate-limit registry.
The caps reflect what each upstream actually enforces, not what
we wish for; see ``plans/01`` per-source notes. Anything not
listed here either has no documented cap or ships its own
persistent scheduler (AniDB).
:return: A registry pre-populated with every public backend.
:rtype: RateLimitRegistry
"""
r = RateLimitRegistry()
r.register("anilist", capacity=30, refill_per_second=0.5)
r.register("jikan", capacity=3, refill_per_second=1.0)
r.register("kitsu", capacity=10, refill_per_second=10.0)
r.register("mangadex", capacity=5, refill_per_second=5.0)
r.register("danbooru", capacity=10, refill_per_second=10.0)
r.register("shikimori", capacity=5, refill_per_second=5.0)
r.register("ann", capacity=5, refill_per_second=1.0)
r.register("trace", capacity=1, refill_per_second=0.5)
r.register("nekos", capacity=10, refill_per_second=3.0)
r.register("waifu", capacity=10, refill_per_second=10.0)
r.register("ghibli", capacity=5, refill_per_second=1.0)
r.register("quote", capacity=5, refill_per_second=5.0 / 3600.0)
return r
[docs]
def selftest() -> bool:
"""Smoke-test the bucket and the default registry.
Builds a small bucket, exercises both acquire variants, builds
the default registry, and verifies every plan-01 backend resolves.
Does not call real ``time.sleep``; the bucket is sized to fit the
capacity so no waits are required.
:return: ``True`` on success.
:rtype: bool
"""
b = TokenBucket(capacity=2, refill_per_second=1.0)
assert b.try_acquire() is True
assert b.try_acquire() is True
assert b.try_acquire() is False
r = default_registry()
for name in [
"anilist",
"jikan",
"kitsu",
"mangadex",
"danbooru",
"shikimori",
"ann",
"trace",
"nekos",
"waifu",
"ghibli",
"quote",
]:
assert r.get(name).capacity > 0
return True