This repository has been archived on 2026-05-20. You can view files and clone it, but cannot push or open issues or pull requests.
media-acquisition/catalog/catalog.py
obsidian-ai d300d83ce1 init: media-acquisition pipeline scaffold
Self-hosted BitTorrent + arr-stack + catalog-update pipeline targeting
nullstone (Debian 13). Replaces the legacy onyx -> rsync -> import
round-trip.

Contents:
- README.md          headline + ASCII architecture diagram + quickstart
- CLAUDE.md          project rules (mirrors beta-flix style)
- .gitignore         secrets dirs (.env, gluetun, qbt config, ssh keys)
- .gitleaksignore    allowlist nullstone LAN addr + Tailscale CGNAT
- docs/architecture.md   the plan in detail (gluetun + qbt + arr + catalog)
- docs/migration.md  onyx-qbt -> nullstone-qbt runbook (3 phases)
- docs/trackers.md   tracker schema + IP-pinning + ratio notes (user-curated)
- compose/docker-compose.yml  gluetun v3.40 + qbt 5.0.5 (netns=gluetun) +
                              sonarr/radarr/prowlarr (hotio) + betaflix-catalog
- compose/.env.example       documented env-var template (no secrets)
- compose/traefik/arr.yml    file-provider for qbt/sonarr/radarr/prowlarr
                             .s8n.ru subdomains, LAN+TS only via
                             trusted-only@file + authentik-forwardauth@file
- catalog/catalog.py         Flask service, ~340 LoC, /sonarr + /radarr +
                             /healthz; pulls beta-flix, inserts alphabetic
                             row into MEDIA-LIST.md, writes run log, commits
                             + pushes as obsidian-ai. Idempotent via
                             payload-hash cache.
- catalog/Dockerfile         python:3.12-slim + git + tini
- catalog/requirements.txt   flask + jinja2 + requests + gitpython + pyyaml (pinned)
- catalog/templates/*.j2     run log + catalog row Jinja templates
- catalog/README.md          service docs
- scripts/migrate-onyx.sh    phase-2 helper (rsync + .torrent ship, dry-run by default)
- scripts/add-tracker.sh     Prowlarr API helper
- scripts/killswitch-test.sh gluetun kill-switch verification (3 steps)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 01:15:43 +01:00

366 lines
12 KiB
Python

"""betaflix-catalog — Sonarr/Radarr OnImport webhook receiver.
Listens for OnImport events from Sonarr and Radarr, edits
`playbooks/import-media/MEDIA-LIST.md` in the beta-flix Forgejo repo, writes
a per-import run log, and commits + pushes as `obsidian-ai`.
POST endpoints:
/sonarr Sonarr Connect webhook target.
/radarr Radarr Connect webhook target.
/healthz Liveness probe.
Idempotency: payload-hash cache at /state/seen-imports.json. Duplicates skipped.
Environment:
FORGEJO_REMOTE e.g. https://git.s8n.ru/s8n/beta-flix.git
FORGEJO_PUSH_TOKEN PAT — embedded into the push URL.
GIT_AUTHOR_NAME obsidian-ai
GIT_AUTHOR_EMAIL obsidian-ai@s8n.ru
LISTEN_PORT 5055
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import subprocess
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from flask import Flask, jsonify, request
from jinja2 import Environment, FileSystemLoader, select_autoescape
# --- Config -----------------------------------------------------------------
REPO_PATH = Path(os.environ.get("REPO_PATH", "/repo"))
STATE_DIR = Path(os.environ.get("STATE_DIR", "/state"))
TEMPLATES_DIR = Path(__file__).parent / "templates"
FORGEJO_REMOTE = os.environ.get("FORGEJO_REMOTE", "https://git.s8n.ru/s8n/beta-flix.git")
FORGEJO_TOKEN = os.environ.get("FORGEJO_PUSH_TOKEN", "")
GIT_AUTHOR_NAME = os.environ.get("GIT_AUTHOR_NAME", "obsidian-ai")
GIT_AUTHOR_EMAIL = os.environ.get("GIT_AUTHOR_EMAIL", "obsidian-ai@s8n.ru")
LISTEN_PORT = int(os.environ.get("LISTEN_PORT", "5055"))
MEDIA_LIST = REPO_PATH / "playbooks" / "import-media" / "MEDIA-LIST.md"
RUNS_DIR = REPO_PATH / "playbooks" / "import-media" / "runs"
SEEN_PATH = STATE_DIR / "seen-imports.json"
# Section headers in MEDIA-LIST.md the bot will edit.
MOVIES_SECTION = "## Movies"
TV_SECTION = "## TV"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stdout,
)
log = logging.getLogger("catalog")
app = Flask(__name__)
_lock = threading.Lock()
_jinja = Environment(
loader=FileSystemLoader(str(TEMPLATES_DIR)),
autoescape=select_autoescape(["html", "xml"]),
trim_blocks=True,
lstrip_blocks=True,
)
# --- Idempotency ------------------------------------------------------------
def _load_seen() -> set[str]:
if not SEEN_PATH.exists():
return set()
try:
return set(json.loads(SEEN_PATH.read_text()))
except Exception:
log.warning("seen-imports.json corrupt; resetting")
return set()
def _save_seen(seen: set[str]) -> None:
STATE_DIR.mkdir(parents=True, exist_ok=True)
SEEN_PATH.write_text(json.dumps(sorted(seen)))
def _payload_hash(kind: str, payload: dict[str, Any]) -> str:
"""Stable hash for the import event — series:season:episode or movie:year."""
if kind == "sonarr":
sid = payload.get("series", {}).get("id", "?")
eps = payload.get("episodes", []) or [{}]
keys = sorted(f"{e.get('seasonNumber', '?')}x{e.get('episodeNumber', '?')}" for e in eps)
seed = f"sonarr:{sid}:{','.join(keys)}"
elif kind == "radarr":
mid = payload.get("movie", {}).get("id", "?")
seed = f"radarr:{mid}"
else:
seed = f"unknown:{json.dumps(payload, sort_keys=True)}"
return hashlib.sha256(seed.encode()).hexdigest()[:16]
# --- Git helpers ------------------------------------------------------------
def _git(*args: str, cwd: Path = REPO_PATH) -> subprocess.CompletedProcess:
env = os.environ.copy()
env.setdefault("GIT_AUTHOR_NAME", GIT_AUTHOR_NAME)
env.setdefault("GIT_AUTHOR_EMAIL", GIT_AUTHOR_EMAIL)
env.setdefault("GIT_COMMITTER_NAME", GIT_AUTHOR_NAME)
env.setdefault("GIT_COMMITTER_EMAIL", GIT_AUTHOR_EMAIL)
return subprocess.run(
["git", *args],
cwd=cwd,
env=env,
check=True,
capture_output=True,
text=True,
)
def _ensure_repo() -> None:
"""Clone the repo if /repo is empty."""
if (REPO_PATH / ".git").is_dir():
return
REPO_PATH.mkdir(parents=True, exist_ok=True)
clone_url = _push_url()
subprocess.run(
["git", "clone", clone_url, str(REPO_PATH)],
check=True,
capture_output=True,
text=True,
)
def _push_url() -> str:
if FORGEJO_TOKEN and FORGEJO_REMOTE.startswith("https://"):
return FORGEJO_REMOTE.replace("https://", f"https://{FORGEJO_TOKEN}@", 1)
return FORGEJO_REMOTE
def _pull_rebase() -> None:
_git("fetch", "origin")
_git("rebase", "origin/main")
def _commit_and_push(title: str) -> str:
_git("add", "playbooks/import-media/MEDIA-LIST.md", "playbooks/import-media/runs/")
status = _git("status", "--porcelain")
if not status.stdout.strip():
log.info("no changes to commit (%s)", title)
return ""
msg = f"catalog: add {title}"
_git("commit", "-m", msg, f"--author={GIT_AUTHOR_NAME} <{GIT_AUTHOR_EMAIL}>")
_git("push", _push_url(), "HEAD:main")
sha = _git("rev-parse", "HEAD").stdout.strip()
return sha
# --- MEDIA-LIST.md editing --------------------------------------------------
def _normalise_title(title: str) -> str:
return re.sub(r"\s+", " ", title.strip())
def _slugify(s: str) -> str:
s = re.sub(r"[^a-zA-Z0-9]+", "-", s.lower()).strip("-")
return s[:80] or "untitled"
def _row(kind: str, title: str, year: int | None, source: str) -> str:
year_s = f"({year})" if year else ""
if kind == "tv":
return f"| {title} {year_s} | TV | {source} | _todo_ | "
return f"| {title} {year_s} | Movie | {source} | _todo_ | "
def _insert_alphabetic(section_header: str, row: str, key: str) -> bool:
"""Insert `row` into the section under `section_header`, alphabetic by key.
Returns True if a new row was added, False if the key already existed
(caller handles merge/dedup separately).
"""
if not MEDIA_LIST.exists():
log.warning("MEDIA-LIST.md missing at %s — skipping insert", MEDIA_LIST)
return False
lines = MEDIA_LIST.read_text().splitlines()
try:
start = next(i for i, line in enumerate(lines) if line.strip() == section_header)
except StopIteration:
log.warning("section %r not found in MEDIA-LIST.md", section_header)
return False
# Find table boundaries.
i = start + 1
while i < len(lines) and not lines[i].lstrip().startswith("|"):
i += 1
if i >= len(lines):
log.warning("no table found under section %r", section_header)
return False
header_idx = i
# Skip header + separator rows.
i = header_idx + 2
section_rows_start = i
while i < len(lines) and lines[i].lstrip().startswith("|"):
if key.lower() in lines[i].lower():
log.info("row already present for key=%r — skipping", key)
return False
i += 1
section_rows_end = i
# Alphabetic insert by first column.
insert_at = section_rows_end
for j in range(section_rows_start, section_rows_end):
cell = lines[j].split("|")[1].strip() if "|" in lines[j] else ""
if key.lower() < cell.lower():
insert_at = j
break
lines.insert(insert_at, row)
MEDIA_LIST.write_text("\n".join(lines) + "\n")
return True
def _write_run_log(slug: str, ctx: dict[str, Any]) -> Path:
RUNS_DIR.mkdir(parents=True, exist_ok=True)
template = _jinja.get_template("run.md.j2")
out = RUNS_DIR / f"{slug}.md"
out.write_text(template.render(**ctx))
return out
# --- Webhook handlers -------------------------------------------------------
def _handle_sonarr(payload: dict[str, Any]) -> tuple[str, bool]:
series = payload.get("series", {}) or {}
title = _normalise_title(series.get("title", "Unknown Series"))
year = series.get("year") or None
eps = payload.get("episodes", []) or []
files = payload.get("episodeFiles") or payload.get("episodeFile") or []
if isinstance(files, dict):
files = [files]
source = (files[0].get("sceneName") or files[0].get("relativePath") or "?") if files else "?"
key = f"{title} ({year})" if year else title
with _lock:
_ensure_repo()
_pull_rebase()
added = _insert_alphabetic(TV_SECTION, _row("tv", title, year, source), key)
slug = _slugify(f"{key}-S{eps[0].get('seasonNumber','?')}E{eps[0].get('episodeNumber','?')}" if eps else key)
_write_run_log(slug, {
"kind": "tv",
"title": title,
"year": year,
"source": source,
"episodes": eps,
"ts": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"row_added": added,
})
sha = _commit_and_push(f"{title} ({year})" if year else title)
return sha, added
def _handle_radarr(payload: dict[str, Any]) -> tuple[str, bool]:
movie = payload.get("movie", {}) or {}
title = _normalise_title(movie.get("title", "Unknown Movie"))
year = movie.get("year") or None
mfile = payload.get("movieFile") or {}
source = mfile.get("sceneName") or mfile.get("relativePath") or "?"
key = f"{title} ({year})" if year else title
with _lock:
_ensure_repo()
_pull_rebase()
added = _insert_alphabetic(MOVIES_SECTION, _row("movie", title, year, source), key)
slug = _slugify(key)
_write_run_log(slug, {
"kind": "movie",
"title": title,
"year": year,
"source": source,
"ts": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"row_added": added,
})
sha = _commit_and_push(f"{title} ({year})" if year else title)
return sha, added
# --- Flask routes -----------------------------------------------------------
@app.get("/healthz")
def healthz():
return jsonify(ok=True), 200
@app.post("/sonarr")
def sonarr():
payload = request.get_json(silent=True) or {}
event = payload.get("eventType", "")
if event in ("Test", "ApplicationUpdate"):
log.info("sonarr probe event=%s — ack", event)
return jsonify(ok=True, ignored=event), 200
if event != "Import" and event != "Download":
log.info("sonarr event=%s — ignored", event)
return jsonify(ok=True, ignored=event), 200
h = _payload_hash("sonarr", payload)
seen = _load_seen()
if h in seen:
log.info("sonarr duplicate event hash=%s — skipping", h)
return jsonify(ok=True, duplicate=h), 200
try:
sha, added = _handle_sonarr(payload)
except subprocess.CalledProcessError as e:
log.exception("git failed: %s", e.stderr)
return jsonify(ok=False, error=e.stderr), 500
except Exception as e: # noqa: BLE001
log.exception("sonarr handler crashed")
return jsonify(ok=False, error=str(e)), 500
seen.add(h)
_save_seen(seen)
return jsonify(ok=True, sha=sha, row_added=added), 200
@app.post("/radarr")
def radarr():
payload = request.get_json(silent=True) or {}
event = payload.get("eventType", "")
if event in ("Test", "ApplicationUpdate"):
log.info("radarr probe event=%s — ack", event)
return jsonify(ok=True, ignored=event), 200
if event != "Import" and event != "Download":
log.info("radarr event=%s — ignored", event)
return jsonify(ok=True, ignored=event), 200
h = _payload_hash("radarr", payload)
seen = _load_seen()
if h in seen:
log.info("radarr duplicate event hash=%s — skipping", h)
return jsonify(ok=True, duplicate=h), 200
try:
sha, added = _handle_radarr(payload)
except subprocess.CalledProcessError as e:
log.exception("git failed: %s", e.stderr)
return jsonify(ok=False, error=e.stderr), 500
except Exception as e: # noqa: BLE001
log.exception("radarr handler crashed")
return jsonify(ok=False, error=str(e)), 500
seen.add(h)
_save_seen(seen)
return jsonify(ok=True, sha=sha, row_added=added), 200
if __name__ == "__main__":
log.info("betaflix-catalog listening on 0.0.0.0:%d", LISTEN_PORT)
app.run(host="0.0.0.0", port=LISTEN_PORT)