Page 1 of 1

Sabnzbd & Sickgear running in Docker

Posted: March 14th, 2026, 2:10 pm
by Eejit
I've been using Sabnzbd since 2008 on a PC. I've recently installed both Sabnzbd and Sickgear on a NAS, running in docker.
The normal autoProcessTV scripts didn't seem to work when running in docker containers but I noticed that Sickgear has an API that can be used to carry out post processing.

I'm not a coder so took advantage of AI. This code does the following:


1.Collects SABnzbd job information
- Reads all SAB_ environment variables.
- Extracts the NZB name and the completed download directory.
- Uses this as the raw input for all later processing.

๐Ÿง  Canonical naming and metadata extraction
2. Rebuilds the folder/file base name into a canonical format
- Splits the NZB name into tokens.
- Identifies the first valid year (1900โ€“2030).
- Identifies the season/episode token (SxxEyy).
- Treats everything before the year or SxxEyy as the show name.
- Reconstructs the name in this exact order:
- Show name
- .(Year) (always with a period before the parentheses)
- .SxxEyy
- All remaining release tags (quality, source, codec, audio, group)
- Suffix like .1 or .2 if present
- Produces a clean, predictable base name used for folder + files.

๐Ÿ“ Folder handling
3. Renames the download folder safely
- Attempts to rename the folder to the canonical name.
- If the target folder already exists:
- Keeps the original folder name (preserves .1, .2, etc.).
- Avoids collisions and accidental merges.
- Ensures SickGear receives a clean, canonical folder path.

๐ŸŽฌ Media file renaming
4. Renames the main video file
- Detects the primary video file by extension.
- Renames it to match the canonical base name exactly.
5. Renames subtitle files
- Normalises language codes (eng โ†’ en, spanish โ†’ es, etc.).
- Preserves extra suffixes (forced, HI, SDH, etc.).
- Renames subtitles to match the canonical base name.

๐Ÿ”„ SickGear integration
6. Sends the cleaned folder to SickGear
- Calls SickGearโ€™s sg.postprocess API.
- Includes the canonical folder path.
- Retries up to 5 times with exponential backoff.
- Parses SickGearโ€™s JSON response and detects:
- Success
- No matching show
- 0 episodes processed
- API errors

๐Ÿงน Cleanup
7. Deletes the processed folder (optional)
- If enabled, removes the entire folder after SickGear finishes.
- Ensures no leftover clutter in the temp directory.

๐Ÿ–ฅ๏ธ Output control (new debug system)
8. Prints output based on DEBUG_LEVEL
- 0 โ€” Silent mode
- Prints only fatal errors and final status.
- 1 โ€” Errors only
- Prints all errors + final status.
- 2 โ€” Full debug
- Prints everything (debug, info, errors, API calls, rename actions).

Code: Select all

#!/usr/bin/env python3
import os
import re
import json
import shutil
import requests
import time

# --- USER CONFIG ---
SICKGEAR_API_KEY = "<Your API Key"
SICKGEAR_URL = "<Your Sickgear URL"
DELETE_AFTER_PROCESSING = True

# DEBUG LEVEL:
# 0 = silent except fatal errors + final status
# 1 = errors only
# 2 = full debug output
DEBUG_LEVEL = 2
# -------------------

VIDEO_EXTENSIONS = [".mkv", ".mp4", ".avi", ".mov", ".m4v"]
SUBTITLE_EXTENSIONS = [".srt", ".ass", ".sub", ".vtt"]

LANG_MAP = {
    "english": "en", "eng": "en", "en": "en",
    "french": "fr", "fra": "fr", "fre": "fr", "fr": "fr",
    "spanish": "es", "spa": "es", "es": "es",
    "german": "de", "deu": "de", "ger": "de", "de": "de",
    "italian": "it", "ita": "it", "it": "it",
    "portuguese": "pt", "por": "pt", "pt": "pt",
    "dutch": "nl", "nld": "nl", "dut": "nl", "nl": "nl",
}

# ---------------------------------------------------------
# LOGGING
# ---------------------------------------------------------

def log_debug(msg):
    if DEBUG_LEVEL >= 2:
        print(msg)

def log_error(msg):
    if DEBUG_LEVEL >= 1:
        print(msg)

def debug_print(title, data, is_error=False):
    if is_error:
        if DEBUG_LEVEL >= 1:
            print(f"\n=== {title} ===")
            print(data)
    else:
        if DEBUG_LEVEL >= 2:
            print(f"\n=== {title} ===")
            if isinstance(data, dict):
                print(json.dumps(data, indent=4))
            else:
                print(data)

# ---------------------------------------------------------
# SAB PARAM COLLECTION
# ---------------------------------------------------------

def collect_sab_params():
    return {k: v for k, v in os.environ.items() if k.startswith("SAB_")}

# ---------------------------------------------------------
# CANONICAL NAME BUILDER
# ---------------------------------------------------------

def canonicalise_name(raw_name):
    # Extract suffix like .1
    suffix = ""
    m = re.search(r"\.(\d+)$", raw_name)
    if m:
        suffix = "." + m.group(1)
        raw_name = raw_name[: -len(suffix)]

    parts = raw_name.split(".")

    # Detect first year
    year = None
    for p in parts:
        if re.fullmatch(r"(19|20)\d{2}", p):
            year = p
            break

    # Detect SxxEyy
    season_ep = None
    for p in parts:
        if re.fullmatch(r"[Ss]\d{2}[Ee]\d{2}", p):
            season_ep = p.upper()
            break

    # Determine show name (everything before year or SxxEyy)
    show_parts = []
    found_key = False
    for p in parts:
        if p == year or p.upper() == season_ep:
            found_key = True
        if not found_key:
            show_parts.append(p)

    show = ".".join(show_parts)

    # Build canonical name
    new_name = show

    # Insert year with a period before parentheses
    if year:
        new_name += f".({year})"

    # Insert SxxEyy
    if season_ep:
        new_name += f".{season_ep}"

    # Add remaining tags
    for p in parts:
        if p not in show_parts and p != year and p.upper() != season_ep:
            new_name += f".{p}"

    # Add suffix like .1
    new_name += suffix

    return new_name

# ---------------------------------------------------------
# UPDATED FOLDER RENAME LOGIC
# ---------------------------------------------------------

def rename_download_folder(old_path, new_name):
    if not old_path or not os.path.exists(old_path):
        debug_print("Rename skipped", f"Path does not exist: {old_path}")
        return old_path

    parent = os.path.dirname(old_path)
    desired_path = os.path.join(parent, new_name)

    # If the desired folder already exists โ†’ KEEP the original folder name
    if os.path.exists(desired_path):
        debug_print("Folder rename skipped",
                    f"Target exists: {desired_path}. Keeping: {old_path}")
        return old_path

    try:
        os.rename(old_path, desired_path)
        debug_print("Folder renamed", f"{old_path} โ†’ {desired_path}")
        return desired_path
    except Exception as e:
        debug_print("Folder rename failed", str(e), is_error=True)
        return old_path

# ---------------------------------------------------------
# NORMALISE SUBTITLE LANGUAGE
# ---------------------------------------------------------

def normalise_subtitle_suffix(filename):
    parts = filename.split(".")
    base = parts[0]
    suffix_parts = parts[1:]

    if len(suffix_parts) < 2:
        return filename

    lang_raw = suffix_parts[0].lower()
    lang = LANG_MAP.get(lang_raw, lang_raw)

    new_suffix = ".".join([lang] + suffix_parts[1:])
    return base + "." + new_suffix

# ---------------------------------------------------------
# RENAME VIDEO + SUBTITLE FILES
# ---------------------------------------------------------

def rename_media_files(folder_path, new_base_name):
    if not os.path.exists(folder_path):
        debug_print("Media rename skipped", f"Folder does not exist: {folder_path}")
        return

    video_file = None
    video_ext = None

    # Find main video file
    for filename in os.listdir(folder_path):
        lower = filename.lower()
        if any(lower.endswith(ext) for ext in VIDEO_EXTENSIONS):
            video_file = filename
            video_ext = os.path.splitext(filename)[1]
            break

    if not video_file:
        debug_print("Video rename skipped", "No video file found in folder")
        return

    # Rename video file
    old_video_path = os.path.join(folder_path, video_file)
    new_video_path = os.path.join(folder_path, new_base_name + video_ext)

    try:
        os.rename(old_video_path, new_video_path)
        debug_print("Video file renamed", f"{video_file} โ†’ {new_base_name + video_ext}")
    except Exception as e:
        debug_print("Video rename failed", str(e), is_error=True)

    # Rename subtitles
    for filename in os.listdir(folder_path):
        lower = filename.lower()
        if any(lower.endswith(ext) for ext in SUBTITLE_EXTENSIONS):
            old_sub_path = os.path.join(folder_path, filename)

            normalised = normalise_subtitle_suffix(filename)
            suffix = "." + ".".join(normalised.split(".")[1:])
            new_sub_name = new_base_name + suffix
            new_sub_path = os.path.join(folder_path, new_sub_name)

            try:
                os.rename(old_sub_path, new_sub_path)
                debug_print("Subtitle file renamed", f"{filename} โ†’ {new_sub_name}")
            except Exception as e:
                debug_print("Subtitle rename failed", str(e), is_error=True)

# ---------------------------------------------------------
# SICKGEAR JSON EVALUATION
# ---------------------------------------------------------

def evaluate_sickgear_json(json_data):
    result = json_data.get("result", "").lower()
    message = json_data.get("message", "").lower()
    data = json_data.get("data", {})

    if result in ("failure", "error"):
        return False, f"SickGear reported failure: {json_data.get('message', '')}"

    failure_terms = [
        "no show", "unable to match", "unable to parse",
        "0 episodes", "nothing to process", "did not find",
        "no valid", "no episode", "no match"
    ]

    if any(term in message for term in failure_terms):
        return False, f"SickGear reported no matching show: {json_data.get('message', '')}"

    if isinstance(data, dict) and data.get("processed", None) == 0:
        return False, f"SickGear processed 0 episodes: {json_data.get('message', '')}"

    return True, f"SickGear success: {json_data.get('message', '')}"

# ---------------------------------------------------------
# SICKGEAR API WITH RETRIES
# ---------------------------------------------------------

def send_to_sickgear(sab_params):
    complete_dir = sab_params.get("SAB_COMPLETE_DIR")

    payload = {
        "cmd": "sg.postprocess",
        "path": complete_dir,
        "return_data": True,
        "process_method": "move",
    }

    api_url = f"{SICKGEAR_URL}/api/{SICKGEAR_API_KEY}/"

    debug_print("SickGear API URL", api_url)
    debug_print("Payload sent to SickGear", payload)

    max_retries = 5
    delay = 5

    for attempt in range(1, max_retries + 1):
        debug_print(f"SickGear Attempt {attempt} of {max_retries}", "")

        try:
            response = requests.get(api_url, params=payload, timeout=30)
            debug_print("Full Request URL", response.url)
            debug_print("SickGear Response Code", response.status_code)
            debug_print("SickGear Response Body", response.text)

            try:
                json_data = response.json()
                debug_print("Parsed JSON", json_data)
                return evaluate_sickgear_json(json_data)
            except:
                debug_print("Invalid JSON, retrying...", "")

        except Exception as e:
            debug_print("Error contacting SickGear", str(e), is_error=True)

        time.sleep(delay)
        delay *= 2

    return False, "Failed to communicate with SickGear after 5 attempts."

# ---------------------------------------------------------
# DELETE PROCESSED FILES
# ---------------------------------------------------------

def delete_processed_files(path):
    if not path or not os.path.exists(path):
        debug_print("Delete skipped", f"Path does not exist: {path}")
        return True

    try:
        shutil.rmtree(path)
        debug_print("Delete successful", f"Removed: {path}")
        return True
    except Exception as e:
        debug_print("Delete failed", str(e), is_error=True)
        return False

# ---------------------------------------------------------
# MAIN SCRIPT
# ---------------------------------------------------------

def main():
    if DEBUG_LEVEL >= 2:
        print("=== SABnzbd โ†’ SickGear Unified Script ===")

    final_status = "OK"

    sab = collect_sab_params()
    debug_print("SABnzbd Parameters Received", sab)

    nzb_name = sab.get("SAB_FINAL_NAME")
    old_dir = sab.get("SAB_COMPLETE_DIR")

    # Canonicalise NZB name
    canonical_name = canonicalise_name(nzb_name)

    # Rename folder (or keep .1 if exists)
    new_dir = rename_download_folder(old_dir, canonical_name)
    sab["SAB_COMPLETE_DIR"] = new_dir

    # Canonicalise folder base name too
    folder_base = os.path.basename(new_dir)
    canonical_base = canonicalise_name(folder_base)

    # Rename video + subtitle files
    rename_media_files(new_dir, canonical_base)

    # Send to SickGear
    success, message = send_to_sickgear(sab)
    if not success:
        final_status = message

    # Delete processed files
    if DELETE_AFTER_PROCESSING:
        if not delete_processed_files(new_dir):
            final_status = "File deletion failed"

    # Final status always prints in DEBUG_LEVEL 0/1/2
    print(f"\n=== Script Finished: {final_status} ===")

if __name__ == "__main__":
    main()