Adds lib/audit-coverage.py: queries Jellyfin live for every series, every episode, and every movie; classifies each by whether the English subtitle comes from a sidecar, embedded stream, or doesn't exist; renders a Markdown report with one-char-per-episode bars for visual scanning. Output file is processes/subtitles/COVERAGE.md, regenerated on demand. v2 sub-rest-fetch.py and v3 sub-a7d-fetch.py now invoke the audit at end of a successful run, so the committed coverage file stays in sync with library state without manual intervention. v3.5 yt-fetch path skips the auto-call since it doesn't speak to Jellyfin directly; run audit manually after copying YT sidecars to nullstone. README.md surfaces the audit at the top so anyone landing in the recipe folder sees current state before starting a run.
292 lines
9.9 KiB
Python
Executable file
292 lines
9.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Subtitle fetcher v2 — direct OpenSubtitles REST API.
|
|
|
|
Bypasses the Jellyfin OpenSubtitles plugin to dodge season/episode numbering
|
|
mismatches. Looks each library episode up by its per-episode IMDB id, picks
|
|
the best English match, downloads via the REST endpoint, and writes the
|
|
sidecar straight onto nullstone next to the media file (via SSH).
|
|
|
|
Why v2 exists: see ../CHANGELOG.md "Known break" — American Dad library
|
|
uses Hulu season numbering, OS catalogues by Fox airing order; the plugin
|
|
queries by (parent_imdb_id, season, episode) so library S02E01 → OS S01E08
|
|
returned 0 hits even though the per-episode IMDB id (tt0511631) is real.
|
|
|
|
Picker: highest download_count among non-HI, non-MT, non-AI, non-Forced
|
|
candidates; 23.976fps preferred. Falls back to all candidates if every match
|
|
is HI/MT/AI/Forced.
|
|
|
|
Usage:
|
|
sub-rest-fetch.py <series-id> --season <N> [--start <ep>] [--end <ep>]
|
|
sub-rest-fetch.py <series-id> --all
|
|
|
|
Env (required):
|
|
JELLYFIN_TOKEN X-Emby-Token for nullstone Jellyfin
|
|
OPENSUBTITLES_API_KEY Path to file holding the API key
|
|
OPENSUBTITLES_USER OS account username
|
|
OPENSUBTITLES_PASS OS account password
|
|
|
|
Env (optional):
|
|
NULLSTONE SSH target, default user@192.168.0.100
|
|
DRY_RUN=1 search + pick only, no download
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
|
|
OS_BASE = "https://api.opensubtitles.com/api/v1"
|
|
USER_AGENT = "arrflix v1.0.0"
|
|
JF_BASE = "http://localhost:8096"
|
|
NULLSTONE = os.environ.get("NULLSTONE", "user@192.168.0.100")
|
|
|
|
|
|
def die(msg: str, code: int = 1) -> None:
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(code)
|
|
|
|
|
|
def env_or_die(name: str) -> str:
|
|
v = os.environ.get(name)
|
|
if not v:
|
|
die(f"{name} not set")
|
|
return v
|
|
|
|
|
|
def load_api_key() -> str:
|
|
path = env_or_die("OPENSUBTITLES_API_KEY")
|
|
with open(path) as f:
|
|
return f.read().strip()
|
|
|
|
|
|
def _curl(url: str, method: str = "GET", headers: dict | None = None,
|
|
body: dict | None = None, binary: bool = False) -> bytes:
|
|
"""OpenSubtitles' frontend rejects urllib (consistent 503 on /download).
|
|
curl works against the same endpoint and headers. Use curl uniformly."""
|
|
cmd = ["curl", "-sSf", "-X", method, url]
|
|
for k, v in (headers or {}).items():
|
|
cmd += ["-H", f"{k}: {v}"]
|
|
if body is not None:
|
|
cmd += ["--data", json.dumps(body)]
|
|
return subprocess.check_output(cmd)
|
|
|
|
|
|
def http_json(url: str, method: str = "GET", headers: dict | None = None,
|
|
body: dict | None = None) -> dict:
|
|
raw = _curl(url, method, headers, body)
|
|
return json.loads(raw.decode())
|
|
|
|
|
|
def http_get_bytes(url: str) -> bytes:
|
|
return _curl(url, "GET", headers={"User-Agent": USER_AGENT})
|
|
|
|
|
|
def jellyfin(path: str, params: dict | None = None) -> dict:
|
|
"""Run Jellyfin API call inside the container on nullstone via SSH."""
|
|
tok = env_or_die("JELLYFIN_TOKEN")
|
|
qs = ""
|
|
if params:
|
|
qs = "?" + urllib.parse.urlencode(params, safe=",")
|
|
url = JF_BASE + path + qs
|
|
cmd = ["ssh", NULLSTONE,
|
|
f"docker exec jellyfin curl -s -H 'X-Emby-Token: {tok}' {shlex.quote(url)}"]
|
|
out = subprocess.check_output(cmd, text=True)
|
|
return json.loads(out)
|
|
|
|
|
|
def list_episodes(series_id: str) -> list[dict]:
|
|
d = jellyfin(f"/Items", {
|
|
"ParentId": series_id,
|
|
"IncludeItemTypes": "Episode",
|
|
"Recursive": "true",
|
|
"Fields": "Path,ParentIndexNumber,IndexNumber,ProviderIds",
|
|
"SortBy": "ParentIndexNumber,IndexNumber",
|
|
})
|
|
return d["Items"]
|
|
|
|
|
|
def os_login(api_key: str, user: str, password: str) -> str:
|
|
res = http_json(f"{OS_BASE}/login", "POST", headers={
|
|
"Api-Key": api_key,
|
|
"Content-Type": "application/json",
|
|
"User-Agent": USER_AGENT,
|
|
}, body={"username": user, "password": password})
|
|
return res["token"]
|
|
|
|
|
|
def os_user_info(api_key: str, bearer: str) -> dict:
|
|
return http_json(f"{OS_BASE}/infos/user", headers={
|
|
"Api-Key": api_key,
|
|
"Authorization": f"Bearer {bearer}",
|
|
"User-Agent": USER_AGENT,
|
|
})["data"]
|
|
|
|
|
|
def os_search(api_key: str, imdb_id: str) -> list[dict]:
|
|
"""imdb_id without the 'tt' prefix per OS convention."""
|
|
res = http_json(
|
|
f"{OS_BASE}/subtitles?imdb_id={imdb_id}&languages=en",
|
|
headers={"Api-Key": api_key, "User-Agent": USER_AGENT})
|
|
return res.get("data", [])
|
|
|
|
|
|
def pick_best(hits: list[dict]) -> dict | None:
|
|
"""Filter HI/MT/AI/Forced, prefer 23.976fps, sort by download_count desc."""
|
|
def attr(h, k):
|
|
return h["attributes"].get(k)
|
|
|
|
clean = [h for h in hits
|
|
if not attr(h, "hearing_impaired")
|
|
and not attr(h, "machine_translated")
|
|
and not attr(h, "ai_translated")
|
|
and not attr(h, "foreign_parts_only")]
|
|
if not clean:
|
|
clean = hits
|
|
fps2398 = [h for h in clean if abs((attr(h, "fps") or 0) - 23.976) < 0.01]
|
|
pool = fps2398 if fps2398 else clean
|
|
pool.sort(key=lambda h: -(attr(h, "download_count") or 0))
|
|
return pool[0] if pool else None
|
|
|
|
|
|
def os_download(api_key: str, bearer: str, file_id: int) -> dict:
|
|
return http_json(f"{OS_BASE}/download", "POST", headers={
|
|
"Api-Key": api_key,
|
|
"Authorization": f"Bearer {bearer}",
|
|
"Content-Type": "application/json",
|
|
"User-Agent": USER_AGENT,
|
|
}, body={"file_id": file_id})
|
|
|
|
|
|
def write_sidecar_remote(content: bytes, remote_path: str) -> None:
|
|
"""ssh redirect file content to nullstone."""
|
|
cmd = ["ssh", NULLSTONE, f"cat > {shlex.quote(remote_path)}"]
|
|
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
|
|
p.communicate(content)
|
|
if p.returncode != 0:
|
|
die(f"failed writing {remote_path}")
|
|
|
|
|
|
def imdb_strip(s: str | None) -> str | None:
|
|
if not s:
|
|
return None
|
|
return s[2:] if s.startswith("tt") else s
|
|
|
|
|
|
def episode_to_paths(ep: dict) -> tuple[str, str]:
|
|
"""Return (remote_dir, base_filename) for sidecar placement."""
|
|
container_path = ep["Path"] # /media/tv/Show/Season XX/Show - SxxExx - Title.mkv
|
|
host_path = container_path.replace("/media/", "/home/user/media/")
|
|
remote_dir = os.path.dirname(host_path)
|
|
base = os.path.splitext(os.path.basename(host_path))[0]
|
|
return remote_dir, base
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("series_id")
|
|
ap.add_argument("--season", type=int, default=None)
|
|
ap.add_argument("--start", type=int, default=1)
|
|
ap.add_argument("--end", type=int, default=10**6)
|
|
ap.add_argument("--all", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
if args.season is None and not args.all:
|
|
die("pass --season N or --all")
|
|
|
|
api_key = load_api_key()
|
|
user = env_or_die("OPENSUBTITLES_USER")
|
|
pw = env_or_die("OPENSUBTITLES_PASS")
|
|
dry = os.environ.get("DRY_RUN") == "1"
|
|
|
|
bearer = os_login(api_key, user, pw)
|
|
info = os_user_info(api_key, bearer)
|
|
print(f"[quota] remaining={info['remaining_downloads']}/{info['allowed_downloads']}, "
|
|
f"resets in {info['reset_time']}", file=sys.stderr)
|
|
|
|
eps = list_episodes(args.series_id)
|
|
work = []
|
|
for ep in eps:
|
|
s = ep["ParentIndexNumber"]
|
|
n = ep["IndexNumber"]
|
|
if not args.all and s != args.season:
|
|
continue
|
|
if not (args.start <= n <= args.end):
|
|
continue
|
|
work.append(ep)
|
|
if not work:
|
|
die("no episodes selected")
|
|
|
|
print(f"[plan] {len(work)} episodes selected", file=sys.stderr)
|
|
if not dry and len(work) > info["remaining_downloads"]:
|
|
print(f"[warn] {len(work)} > quota {info['remaining_downloads']}; "
|
|
f"will halt mid-run", file=sys.stderr)
|
|
|
|
ok = 0
|
|
fail = []
|
|
for ep in work:
|
|
s, n = ep["ParentIndexNumber"], ep["IndexNumber"]
|
|
label = f"S{s:02}E{n:02} {ep['Name']}"
|
|
imdb = imdb_strip(ep.get("ProviderIds", {}).get("Imdb"))
|
|
if not imdb:
|
|
print(f"[skip] {label} — no IMDB id", file=sys.stderr)
|
|
fail.append((label, "no-imdb"))
|
|
continue
|
|
|
|
hits = os_search(api_key, imdb)
|
|
pick = pick_best(hits)
|
|
if not pick:
|
|
print(f"[skip] {label} — 0 hits for imdb={imdb}", file=sys.stderr)
|
|
fail.append((label, "no-hits"))
|
|
continue
|
|
|
|
a = pick["attributes"]
|
|
f = a["files"][0]
|
|
print(f"[pick] {label} imdb={imdb} fid={f['file_id']} dl={a.get('download_count')} "
|
|
f"fps={a.get('fps')} fname={f.get('file_name')}", file=sys.stderr)
|
|
|
|
if dry:
|
|
ok += 1
|
|
continue
|
|
|
|
try:
|
|
dl = os_download(api_key, bearer, f["file_id"])
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"[fail] {label} download (curl exit {e.returncode})", file=sys.stderr)
|
|
fail.append((label, f"dl-curl-{e.returncode}"))
|
|
break # may be quota; stop run
|
|
|
|
link = dl.get("link")
|
|
if not link:
|
|
print(f"[fail] {label} no download link in response: {dl}", file=sys.stderr)
|
|
fail.append((label, "no-link"))
|
|
break
|
|
|
|
content = http_get_bytes(link)
|
|
remote_dir, base = episode_to_paths(ep)
|
|
dest = f"{remote_dir}/{base}.eng.srt"
|
|
write_sidecar_remote(content, dest)
|
|
print(f"[ok] {label} -> {dest} (remaining={dl.get('remaining')})",
|
|
file=sys.stderr)
|
|
ok += 1
|
|
time.sleep(0.5) # be polite
|
|
|
|
print(f"\n[done] ok={ok}/{len(work)} failures={len(fail)}", file=sys.stderr)
|
|
for lab, why in fail:
|
|
print(f" - {lab}: {why}", file=sys.stderr)
|
|
if ok:
|
|
try:
|
|
subprocess.run([os.path.join(os.path.dirname(__file__),
|
|
"audit-coverage.py")],
|
|
check=False)
|
|
except Exception as e:
|
|
print(f"[warn] coverage refresh skipped: {e}", file=sys.stderr)
|
|
return 0 if ok else 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|