"""
SQLite-backed TTL cache for backend responses.
The cache is the project's canonical storage for upstream responses;
it is consulted before any HTTP call and populated after one. The
shape is:
* Rows are keyed by ``(backend, signature)``.
* The body is stored as raw bytes; pydantic models are persisted as
``model.model_dump_json().encode("utf-8")`` and re-hydrated with
``Model.model_validate_json(raw.decode())``.
* Each row carries a TTL (seconds-since-epoch ``expires_at``); a row
whose ``expires_at`` is in the past is treated as missing.
The defaults in :func:`default_ttl_seconds` come from
``plans/03 §10``: 72 h for metadata, 24 h for list pages, 1 h for
schedule / trending, 30 d for offline dumps. Anything else gets the
"metadata" default.
The clock primitive (:data:`_utcnow`) and the cache-dir resolver
(:data:`_user_cache_dir`) are pulled through indirection points so
unit tests can substitute deterministic fakes without monkeypatching
the standard library globally.
"""
from __future__ import annotations
import json
import os
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple, Union
from platformdirs import user_cache_dir
_utcnow = lambda: datetime.now(timezone.utc) # noqa: E731 - patched in tests
_user_cache_dir = lambda: user_cache_dir("animedex") # noqa: E731 - patched in tests
_DEFAULT_TTL_SECONDS = {
"metadata": 72 * 3600,
"list": 24 * 3600,
"schedule": 3600,
"trending": 3600,
"offline_dump": 30 * 86400,
}
[docs]
def default_ttl_seconds(category: str) -> int:
"""Return the project-default TTL for a request category.
Per ``plans/03 §10``:
* ``metadata``: 72 h
* ``list``: 24 h
* ``schedule`` / ``trending``: 1 h
* ``offline_dump``: 30 d
Unknown categories collapse to the metadata default; this keeps
the cache useful for one-off entries without forcing every call
site to declare a category.
:param category: Request category.
:type category: str
:return: Default TTL in seconds.
:rtype: int
"""
return _DEFAULT_TTL_SECONDS.get(category, _DEFAULT_TTL_SECONDS["metadata"])
[docs]
def default_cache_path() -> Path:
"""Resolve the platform-appropriate cache file path.
Uses :func:`platformdirs.user_cache_dir` (via the
:data:`_user_cache_dir` indirection) so the location matches the
OS convention: ``~/.cache/animedex`` on Linux,
``~/Library/Caches/animedex`` on macOS, the appropriate
``LOCALAPPDATA`` subtree on Windows.
:return: Path to ``cache.sqlite`` inside the cache dir.
:rtype: pathlib.Path
"""
return Path(_user_cache_dir()) / "cache.sqlite"
[docs]
class SqliteCache:
"""A small SQLite-backed cache with per-row TTL.
:param path: Filesystem path to the SQLite database file.
Defaults to :func:`default_cache_path`.
:type path: pathlib.Path or str or None
"""
_SCHEMA_V1 = """
CREATE TABLE IF NOT EXISTS cache_rows (
backend TEXT NOT NULL,
signature TEXT NOT NULL,
payload BLOB NOT NULL,
expires_at INTEGER NOT NULL,
PRIMARY KEY (backend, signature)
) WITHOUT ROWID;
"""
_CACHE_META_SCHEMA = """
CREATE TABLE IF NOT EXISTS cache_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
"""
_CURRENT_SCHEMA_VERSION = 2
[docs]
def __init__(self, path: Optional[Union[Path, str]] = None) -> None:
self.path = Path(path) if path is not None else default_cache_path()
self.path.parent.mkdir(parents=True, exist_ok=True)
# check_same_thread=False so backend retry/backoff helpers running on
# worker threads do not hit sqlite3.ProgrammingError. We pair it with
# an in-process _lock to serialise access across threads sharing this
# SqliteCache instance; SQLite itself does not serialise concurrent
# `execute` calls on the same connection.
# journal_mode=WAL allows multiple animedex invocations on the same
# machine (a CLI and `animedex mcp serve`, say) to coexist instead of
# locking each other out via the default rollback journal.
self._conn = sqlite3.connect(str(self.path), check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._lock = threading.Lock()
with self._lock:
self._conn.execute(self._SCHEMA_V1)
self._conn.execute(self._CACHE_META_SCHEMA)
self._migrate_schema_locked()
self._conn.commit()
def _read_schema_version_locked(self) -> int:
"""Return the persisted schema version, defaulting to 1.
Pre-v2 databases lack the ``cache_meta`` table at the time of
creation; the v2 init populates it on first open.
:return: Schema version integer.
:rtype: int
"""
row = self._conn.execute("SELECT value FROM cache_meta WHERE key = 'schema_version'").fetchone()
if row is None:
# Detect a pre-existing v1 cache_rows table to distinguish a
# fresh db from a legacy one. A fresh db has cache_meta empty
# and cache_rows empty too; we treat that as v1 about-to-be-
# upgraded, which is harmless because the migration is
# ALTER TABLE add-column with NULL defaults.
return 1
return int(row[0])
def _migrate_schema_locked(self) -> None:
"""Bring the schema up to :attr:`_CURRENT_SCHEMA_VERSION`.
v1 → v2 adds two nullable columns (``response_headers``,
``fetched_at``) so cache hits can reconstruct the full
``RawResponse`` envelope. Old rows show ``NULL`` for both,
which the get_with_meta helper translates to an empty headers
dict and a ``None`` fetched_at.
"""
current = self._read_schema_version_locked()
if current < 2:
# Add columns; ignore errors when columns already exist
# (re-run resilience).
for stmt in (
"ALTER TABLE cache_rows ADD COLUMN response_headers BLOB",
"ALTER TABLE cache_rows ADD COLUMN fetched_at INTEGER",
):
try:
self._conn.execute(stmt)
except sqlite3.OperationalError:
pass
current = 2
# Persist the schema version.
self._conn.execute(
"INSERT OR REPLACE INTO cache_meta (key, value) VALUES ('schema_version', ?)",
(str(current),),
)
[docs]
def close(self) -> None:
"""Close the underlying SQLite connection.
:return: ``None``.
:rtype: None
"""
with self._lock:
self._conn.close()
[docs]
def __enter__(self) -> "SqliteCache":
return self
[docs]
def __exit__(self, *_excinfo: object) -> None:
self.close()
def _expires_at_seconds(self, ttl_seconds: int) -> int:
return int(_utcnow().timestamp()) + int(ttl_seconds)
def _now_seconds(self) -> int:
return int(_utcnow().timestamp())
[docs]
def set(self, backend: str, signature: str, payload: bytes, *, ttl_seconds: int) -> None:
"""Store or overwrite a row (v1 wrapper).
Exists for callers that don't need the v2 metadata. Internally
delegates to :meth:`set_with_meta` with ``response_headers={}``,
so a subsequent ``get_with_meta`` returns a valid row with
``fetched_at=now`` and an empty headers dict.
:param backend: Backend identifier (e.g. ``"anilist"``).
:type backend: str
:param signature: Caller-derived row signature; must be
stable across runs for the same logical
request.
:type signature: str
:param payload: Raw bytes to store.
:type payload: bytes
:param ttl_seconds: Lifetime in seconds; ``get`` will treat
this row as missing once the time
elapses.
:type ttl_seconds: int
:return: ``None``.
:rtype: None
"""
self.set_with_meta(backend, signature, payload, response_headers={}, ttl_seconds=ttl_seconds)
[docs]
def get(self, backend: str, signature: str) -> Optional[bytes]:
"""Look up a row.
Returns the payload if the row exists and has not expired;
``None`` otherwise. Expired rows are *not* deleted on read
(use :meth:`purge_expired` for that) so a single ``get``
stays a pure read.
:param backend: Backend identifier.
:type backend: str
:param signature: Caller-derived row signature.
:type signature: str
:return: Cached payload or ``None`` when missing / expired.
:rtype: bytes or None
"""
out = self.get_with_meta(backend, signature)
if out is None:
return None
payload, _hdrs, _fetched_at = out
return payload
[docs]
def purge_expired(self) -> int:
"""Delete every expired row.
:return: Number of rows removed.
:rtype: int
"""
with self._lock:
cur = self._conn.execute(
"DELETE FROM cache_rows WHERE expires_at <= ?",
(self._now_seconds(),),
)
self._conn.commit()
return cur.rowcount
[docs]
def selftest() -> bool:
"""Smoke-test the SQLite cache.
Builds a temporary cache file under :data:`_user_cache_dir`,
writes a row, reads it back, lets the test-supplied clock
advance past the TTL, and confirms expiry. Cleans up the
temporary file before returning.
:return: ``True`` on success.
:rtype: bool
"""
path = Path(_user_cache_dir()) / "selftest.sqlite"
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
os.remove(path)
cache = SqliteCache(path=path)
try:
cache.set("_selftest", "key", b"hello", ttl_seconds=60)
assert cache.get("_selftest", "key") == b"hello"
assert cache.get("_selftest", "missing") is None
assert default_ttl_seconds("metadata") == 72 * 3600
finally:
cache.close()
if path.exists():
os.remove(path)
return True