Skip to content

Pipeline API

This page documents the Orchestrator, the core processing coordinator.

Orchestrator

The Orchestrator is the heart of VBC, coordinating all compression jobs.

orchestrator

Pipeline orchestrator for video compression job lifecycle management.

Coordinates file discovery, queue management, metadata extraction, compression jobs, and dynamic thread control. Uses event-driven architecture with EventBus for loose coupling between pipeline and UI layers.

Key responsibilities: - Discover video files matching extensions and size filters - Extract and cache video metadata (codec, FPS, camera model, etc.) - Manage queue of pending jobs with configurable sort order - Submit jobs to thread pool respecting prefetch_factor (submit-on-demand pattern) - Handle job lifecycle: discovery → queuing → processing → completion/failure - Support dynamic thread count adjustment and graceful shutdown - Emit events for UI updates (JobStarted, JobCompleted, JobFailed, etc.)

Orchestrator

Orchestrator(config: AppConfig, event_bus: EventBus, file_scanner: FileScanner, exif_adapter: ExifToolAdapter, ffprobe_adapter: FFprobeAdapter, ffmpeg_adapter: FFmpegAdapter, output_dir_map: Optional[Dict[Path, Path]] = None, local_config_registry: Optional[LocalConfigRegistry] = None, cli_overrides: Optional[CliConfigOverrides] = None)

Video compression pipeline orchestrator.

Manages the full job lifecycle: discovery → queuing → compression. Coordinates with infrastructure adapters (FFmpeg, ExifTool, FFprobe) and publishes events to the UI via EventBus.

Uses thread-safe state management with Condition variables for: - Dynamic thread pool sizing (Ctrl+< and Ctrl+>) - Graceful shutdown coordination (Ctrl+S) - Queue refresh signaling (Ctrl+R)

Implements "submit-on-demand" pattern: submits only prefetch_factor*threads jobs to thread pool; submits new jobs as workers complete (avoids queueing thousands of futures for large directories).

Parameters:

Name Type Description Default
config AppConfig

AppConfig with general, GPU, UI, autorotate, and input/output settings.

required
event_bus EventBus

EventBus for publishing job lifecycle events.

required
file_scanner FileScanner

FileScanner for discovering video files.

required
exif_adapter ExifToolAdapter

ExifToolAdapter for metadata extraction (camera, GPS, etc.).

required
ffprobe_adapter FFprobeAdapter

FFprobeAdapter for codec, FPS, duration probing.

required
ffmpeg_adapter FFmpegAdapter

FFmpegAdapter for AV1 compression execution.

required
output_dir_map Optional[Dict[Path, Path]]

Optional override mapping input_dir → output_dir (else uses suffix).

None
Source code in vbc/pipeline/orchestrator.py
def __init__(
    self,
    config: AppConfig,
    event_bus: EventBus,
    file_scanner: FileScanner,
    exif_adapter: ExifToolAdapter,
    ffprobe_adapter: FFprobeAdapter,
    ffmpeg_adapter: FFmpegAdapter,
    output_dir_map: Optional[Dict[Path, Path]] = None,
    local_config_registry: Optional["LocalConfigRegistry"] = None,
    cli_overrides: Optional["CliConfigOverrides"] = None,
):
    self.config = config
    self.event_bus = event_bus
    self.file_scanner = file_scanner
    self.exif_adapter = exif_adapter
    self.ffprobe_adapter = ffprobe_adapter
    self.ffmpeg_adapter = ffmpeg_adapter
    self.logger = logging.getLogger(__name__)

    # Local config registry and CLI overrides for per-job config resolution
    self.local_registry = local_config_registry
    self.cli_overrides = cli_overrides

    # Metadata cache (thread-safe)
    self._metadata_cache = {}  # Path -> VideoMetadata
    self._metadata_lock = threading.Lock()
    self._metadata_failure_counts: Dict[Path, int] = {}
    self._metadata_failure_limit = 1
    self._metadata_failure_reasons: Dict[Path, str] = {}
    self._metadata_failed_paths: set[Path] = set()
    self._metadata_failed_reported: set[Path] = set()

    # Dynamic control state
    self._shutdown_requested = False
    self._current_max_threads = config.general.threads
    self._active_threads = 0
    self._thread_lock = threading.Condition()
    self._refresh_requested = False
    self._refresh_lock = threading.Lock()
    self._shutdown_event = threading.Event()  # Signal workers to stop
    self._wait_event = threading.Event()       # Signals wait loop to unblock
    self._restart_after_wait = False           # True = R pressed; False = S/Ctrl+C
    self._pause_requested = False
    self._pause_message: Optional[str] = None
    self._verification_abort_message: Optional[str] = None

    # Stats
    self.skipped_vbc_count = 0
    self._stats_lock = threading.Lock()

    # Folder mapping (input_dir -> output_dir)
    self._folder_mapping: Dict[Path, Path] = {}
    self._output_dir_map_override: Dict[Path, Path] = output_dir_map or {}
    self._use_output_dir_map_override = output_dir_map is not None

    # Dynamic input dirs (updated via InputDirsChanged event)
    self._pending_input_dirs: Optional[List[Path]] = None

    self._setup_subscriptions()

Demo Orchestrator

Simulation-mode pipeline used by --demo.

demo_orchestrator

DemoOrchestrator

DemoOrchestrator(config: AppConfig, demo_config: DemoConfig, event_bus: EventBus)
Source code in vbc/pipeline/demo_orchestrator.py
def __init__(self, config: AppConfig, demo_config: DemoConfig, event_bus: EventBus):
    self.config = config
    self.demo_config = demo_config
    self.event_bus = event_bus
    self.logger = logging.getLogger(__name__)
    self._rng = random.Random(demo_config.seed)

    self._shutdown_requested = False
    self._current_max_threads = config.general.threads
    self._active_threads = 0
    self._thread_lock = threading.Condition()
    self._refresh_requested = False
    self._refresh_lock = threading.Lock()
    self._shutdown_event = threading.Event()

    self._job_plans: Dict[Path, DemoJobPlan] = {}

    self._setup_subscriptions()

Error File Mover

Helpers for moving files that failed verification or compression.

error_file_mover

move_failed_files

move_failed_files(input_dirs: Iterable[Path], output_dir_map: Dict[Path, Path], errors_dir_map: Dict[Path, Path], extensions: List[str], logger: Optional[Logger] = None, error_entries: Optional[List[Tuple[Path, Path, Path, Path]]] = None) -> List[Path]

Moves failed source files and their error markers to the errors directory. Returns a list of paths to the moved video files in the destination (errors) directory.

Source code in vbc/pipeline/error_file_mover.py
def move_failed_files(
    input_dirs: Iterable[Path],
    output_dir_map: Dict[Path, Path],
    errors_dir_map: Dict[Path, Path],
    extensions: List[str],
    logger: Optional[logging.Logger] = None,
    error_entries: Optional[List[Tuple[Path, Path, Path, Path]]] = None,
) -> List[Path]:
    """
    Moves failed source files and their error markers to the errors directory.
    Returns a list of paths to the moved video files in the destination (errors) directory.
    """
    error_entries = error_entries or collect_error_entries(
        input_dirs, output_dir_map, errors_dir_map
    )

    total = len(error_entries)
    moved_video_files: List[Path] = []

    if total == 0:
        if logger:
            logger.info("No .err files found for failed file relocation.")
        return []

    if logger:
        logger.info(f"Relocating {total} failed files to errors directories.")

    with Progress(
        SpinnerColumn(),
        TextColumn("{task.description}"),
        BarColumn(),
        TextColumn("{task.completed}/{task.total}"),
        TimeRemainingColumn(),
    ) as progress:
        task = progress.add_task("Moving failed files", total=total)
        for input_dir, output_dir, errors_dir, err_file in error_entries:
            rel_err = err_file.relative_to(output_dir)
            dest_err = errors_dir / rel_err
            dest_err.parent.mkdir(parents=True, exist_ok=True)
            if err_file.exists() and err_file != dest_err:
                shutil.move(str(err_file), str(dest_err))
                if logger:
                    logger.info(f"Moved error marker: {err_file} -> {dest_err}")

            source_paths = _find_sources_for_error(input_dir, rel_err)
            if source_paths:
                for source_path in source_paths:
                    if not source_path.exists():
                        continue
                    rel_source = source_path.relative_to(input_dir)
                    dest_source = errors_dir / rel_source
                    dest_source.parent.mkdir(parents=True, exist_ok=True)
                    if source_path != dest_source:
                        shutil.move(str(source_path), str(dest_source))
                        moved_video_files.append(dest_source)
                        if logger:
                            logger.info(f"Moved source file: {source_path} -> {dest_source}")
            else:
                if logger:
                    logger.warning(f"Failed source file not found for {rel_err}")

            progress.advance(task)

    if logger:
        logger.info("Failed file relocation finished.")

    # Deduplicate in case multiple source variants were moved for one error
    return sorted(list(set(moved_video_files)))

Queue Sorting

Queue ordering strategies used before submit-on-demand scheduling.

queue_sorting

Repair

Repair loop helpers for corrupted FLV inputs and failed outputs.

repair

process_repairs

process_repairs(input_dirs: List[Path], errors_dir_map: Dict[Path, Path], extensions: List[str], logger: Optional[Logger] = None, target_files: Optional[List[Path]] = None) -> int

Scans error directories for corrupted files and attempts to repair them. Strategy 1: Text prefix removal (for FLV/MP4 stream dumps) as a pre-clean step. Strategy 2: Fast re-encode to MKV (final output for all repairs).

Parameters:

Name Type Description Default
input_dirs List[Path]

List of source input directories.

required
errors_dir_map Dict[Path, Path]

Mapping from input_dir to errors_dir.

required
extensions List[str]

List of video extensions to scan for.

required
logger Optional[Logger]

Logger instance.

None
target_files Optional[List[Path]]

Optional list of specific files to repair.

None

Returns:

Type Description
int

Number of successfully repaired files.

Source code in vbc/pipeline/repair.py
def process_repairs(
    input_dirs: List[Path],
    errors_dir_map: Dict[Path, Path],
    extensions: List[str],
    logger: Optional[logging.Logger] = None,
    target_files: Optional[List[Path]] = None,
) -> int:
    """
    Scans error directories for corrupted files and attempts to repair them.
    Strategy 1: Text prefix removal (for FLV/MP4 stream dumps) as a pre-clean step.
    Strategy 2: Fast re-encode to MKV (final output for all repairs).

    Args:
        input_dirs: List of source input directories.
        errors_dir_map: Mapping from input_dir to errors_dir.
        extensions: List of video extensions to scan for.
        logger: Logger instance.
        target_files: Optional list of specific files to repair.

    Returns:
        Number of successfully repaired files.
    """
    console = Console()
    total_repaired = 0
    candidates_to_repair = []

    # 1. Scan for candidates first
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        transient=True,
        console=console
    ) as scan_progress:
        scan_progress.add_task("Scanning for repairable files...", total=None)

        for input_dir in input_dirs:
            errors_dir = errors_dir_map.get(input_dir)
            if not errors_dir or not errors_dir.exists():
                continue

            # If target_files is provided, use it. Otherwise, scan all extensions.
            files_to_check = []
            if target_files is not None:
                for t in target_files:
                    try:
                        if errors_dir in t.parents or t.parent == errors_dir:
                            files_to_check.append(t)
                    except Exception:
                        pass
            else:
                for ext in extensions:
                    if not ext.startswith("."):
                        ext = f".{ext}"
                    files_to_check.extend(errors_dir.rglob(f"*{ext}"))

            for candidate in files_to_check:
                if not candidate.exists():
                    continue

                # Check if already repaired
                repaired_marker = candidate.with_suffix(candidate.suffix + ".repaired")
                if repaired_marker.exists():
                    continue

                # Check error file content to decide strategy
                err_file = candidate.with_suffix(".err")
                error_code = ""
                is_hw_cap = False
                if err_file.exists():
                    try:
                        err_content = err_file.read_text()
                        if _is_metadata_verification_failure(err_content):
                            if logger:
                                logger.info(
                                    f"Skipping repair for {candidate.name} - missing VBC tags is metadata-only."
                                )
                            continue
                        if "Hardware is lacking required capabilities" in err_content:
                            is_hw_cap = True
                        elif "code 234" in err_content or "Invalid argument" in err_content:
                            error_code = "234"
                    except Exception:
                        pass

                if is_hw_cap:
                    if logger:
                        logger.debug(f"Skipping repair for {candidate.name} - hardware limit, not corruption.")
                    continue

                try:
                    rel_path = candidate.relative_to(errors_dir)
                except ValueError:
                    rel_path = Path(candidate.name)

                dest_path = input_dir / rel_path
                dest_mkv = dest_path.with_suffix(".mkv")
                try:
                    candidate_size = max(candidate.stat().st_size, 1)
                except OSError:
                    candidate_size = 1
                candidates_to_repair.append((candidate, dest_path, dest_mkv, repaired_marker, error_code, candidate_size))

    # Deduplicate
    seen_candidates = set()
    unique_candidates = []
    for c, dp, dmkv, rm, ec, size in candidates_to_repair:
        if c not in seen_candidates:
            unique_candidates.append((c, dp, dmkv, rm, ec, size))
            seen_candidates.add(c)
    candidates_to_repair = unique_candidates

    if not candidates_to_repair:
        return 0

    if logger:
        logger.info(f"Found {len(candidates_to_repair)} files eligible for repair.")

    if target_files is not None:
        console.print(f"[bold cyan]Attempting to repair {len(candidates_to_repair)} failed files from this session.[/bold cyan]")
    else:
        console.print(f"[bold cyan]Found {len(candidates_to_repair)} failed files in error directories.[/bold cyan]")

    # 2. Process repairs
    total_bytes = sum(candidate_size for *_rest, candidate_size in candidates_to_repair)
    processed_bytes = 0
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        DownloadColumn(),
        TimeRemainingColumn(),
        console=console
    ) as progress:
        task = progress.add_task("Repairing corrupted files", total=total_bytes)

        for index, (candidate, dest_path, dest_mkv, repaired_marker, error_code, candidate_size) in enumerate(candidates_to_repair, start=1):
            progress.update(
                task,
                description=f"Repairing [yellow]{candidate.name}[/yellow] ({index}/{len(candidates_to_repair)})",
                completed=processed_bytes,
            )

            success = False
            repaired_file_path = None
            temp_flv = None

            if dest_mkv.exists():
                if logger:
                    logger.warning(
                        f"Skipping repair for {candidate.name} - MKV already exists in source: {dest_mkv}"
                    )
                processed_bytes += candidate_size
                progress.update(task, completed=processed_bytes)
                continue

            # STRATEGY 1: FLV Prefix Cut (Fast)
            # Try this if no specific error code OR if it looks like it might be an FLV dump
            reencode_input = candidate
            if not error_code:
                temp_flv = candidate.with_suffix(".repaired_temp.flv")
                try:
                    if repair_flv_file(candidate, temp_flv):
                        reencode_input = temp_flv
                except Exception:
                    temp_flv = None

            # STRATEGY 2: Re-encode to MKV (Final output)
            temp_mkv = candidate.with_suffix(".repaired_temp.mkv")
            try:
                def update_reencode_progress(output_size: int) -> None:
                    current_file_bytes = min(max(output_size, 0), candidate_size)
                    progress.update(task, completed=min(total_bytes, processed_bytes + current_file_bytes))

                # Inform user this might take longer
                if logger:
                    logger.info(f"Attempting re-encode repair for {candidate.name}")
                if repair_via_reencode(reencode_input, temp_mkv, progress_callback=update_reencode_progress):
                    success = True
                    repaired_file_path = temp_mkv
                    dest_path = dest_mkv
            except Exception:
                pass

            if success and repaired_file_path:
                dest_path.parent.mkdir(parents=True, exist_ok=True)
                try:
                    import shutil
                    shutil.move(str(repaired_file_path), str(dest_path))
                    repaired_marker.touch()
                    if logger:
                        logger.info(f"Repaired and restored: {candidate.name} -> {dest_path}")
                    total_repaired += 1
                except Exception as e:
                    if logger:
                        logger.error(f"Failed to move repaired file: {e}")
                    if repaired_file_path.exists():
                        repaired_file_path.unlink()
            if temp_flv and temp_flv.exists():
                try:
                    temp_flv.unlink()
                except Exception:
                    pass

            processed_bytes += candidate_size
            progress.update(task, completed=processed_bytes)

    if total_repaired > 0:
        summary_msg = f"Repaired {total_repaired}/{len(candidates_to_repair)} files."
        console.print(f"[bold green]✔ {summary_msg}[/bold green]")
        console.print("\n[bold white]Please re-run VBC to compress the repaired files restored to source folders.[/bold white]")
        if logger:
            logger.info(summary_msg)
    elif target_files is not None:
        # If we targeted specific files but repaired none, user needs to know
        summary_msg = f"Repaired 0/{len(candidates_to_repair)} files."
        console.print(f"[yellow]⚠ {summary_msg} (Files unreadable or missing video stream)[/yellow]")
        if logger:
            logger.info(summary_msg)

    return total_repaired

Key Responsibilities

  1. Discovery: Scan input directory and filter files
  2. Metadata Management: Thread-safe caching of video metadata
  3. Decision Logic: Determine CQ and rotation per file
  4. Queue Management: Submit-on-demand pattern with prefetch
  5. Concurrency Control: Dynamic thread adjustment
  6. Event Emission: Publish events for UI updates
  7. Error Handling: Create .err markers, handle corrupted files
  8. Graceful Shutdown: Finish active jobs on user request

Usage Example

from pathlib import Path
from vbc.config.loader import load_config
from vbc.infrastructure.event_bus import EventBus
from vbc.infrastructure.file_scanner import FileScanner
from vbc.infrastructure.exif_tool import ExifToolAdapter
from vbc.infrastructure.ffprobe import FFprobeAdapter
from vbc.infrastructure.ffmpeg import FFmpegAdapter
from vbc.pipeline.orchestrator import Orchestrator

# Load configuration
config = load_config(Path("conf/vbc.yaml"))

# Create dependencies
bus = EventBus()
scanner = FileScanner(
    extensions=config.general.extensions,
    min_size_bytes=config.general.min_size_bytes
)
exif = ExifToolAdapter()
exif.et.run()  # Start ExifTool process

ffprobe = FFprobeAdapter()
ffmpeg = FFmpegAdapter(event_bus=bus)

# Create orchestrator
orchestrator = Orchestrator(
    config=config,
    event_bus=bus,
    file_scanner=scanner,
    exif_adapter=exif,
    ffprobe_adapter=ffprobe,
    ffmpeg_adapter=ffmpeg
)

# Run compression
try:
    orchestrator.run(Path("/videos"))
finally:
    # Cleanup ExifTool
    if exif.et.running:
        exif.et.terminate()

Processing Pipeline

Illustrative snippets

The snippets in this section show the pipeline shape and event flow. They are intentionally simplified and are not a verbatim copy of vbc.pipeline.orchestrator.

1. Discovery Phase

# Orchestrator scans directory
files_to_process, stats = orchestrator._perform_discovery(input_dir)

# Emits event
bus.publish(DiscoveryFinished(
    files_found=stats['files_found'],
    files_to_process=stats['files_to_process'],
    already_compressed=stats['already_compressed'],
    ignored_small=stats['ignored_small'],
    ignored_err=stats['ignored_err']
))

2. Job Processing

# For each file
def _process_file(video_file: VideoFile, input_dir: Path):
    # 1. Check for existing .err marker
    if err_path.exists() and not config.general.clean_errors:
        return  # Skip

    # 2. Get stream info (ffprobe)
    stream_info = ffprobe_adapter.get_stream_info(video_file.path)

    # 3. Check and fix color space if needed
    input_path, temp_fixed = _check_and_fix_color_space(
        video_file.path,
        output_path,
        stream_info
    )

    # 4. Get metadata (ExifTool, cached)
    video_file.metadata = _get_metadata(video_file, stream_info)

    # 5. Filter checks
    if config.general.skip_av1 and metadata.codec == "av1":
        return  # Skip

    if config.general.filter_cameras:
        if camera_model not in filter_cameras:
            return  # Skip

    # 6. Determine CQ and rotation
    target_cq = _determine_cq(video_file)
    rotation = _determine_rotation(video_file)

    # 7. Create job and compress
    job = CompressionJob(
        source_file=video_file,
        output_path=output_path,
        rotation_angle=rotation
    )
    bus.publish(JobStarted(job=job))

    # 8. Compress
    ffmpeg_adapter.compress(job, config, rotate=rotation)

    # 9. Post-processing
    if job.status == JobStatus.COMPLETED:
        # Copy metadata
        _copy_deep_metadata(source, output)

        # Check min ratio
        ratio = output_size / input_size
        if ratio > (1.0 - config.general.min_compression_ratio):
            shutil.copy2(source, output)  # Keep original

        bus.publish(JobCompleted(job=job))
    else:
        # Write .err file
        err_path.write_text(job.error_message)
        bus.publish(JobFailed(job=job))

Concurrency Control

ThreadController Pattern

# Block until thread slot available
with self._thread_lock:
    while self._active_threads >= self._current_max_threads:
        self._thread_lock.wait()

    if self._shutdown_requested:
        return  # Don't start new jobs

    self._active_threads += 1

# Process job...

# Release slot
with self._thread_lock:
    self._active_threads -= 1
    self._thread_lock.notify_all()  # Wake up waiting threads

Dynamic Adjustment

# User presses '>' key
def _on_thread_control(self, event: ThreadControlEvent):
    with self._thread_lock:
        old = self._current_max_threads
        new = old + event.change
        self._current_max_threads = max(1, min(8, new))
        self._thread_lock.notify_all()  # Wake up waiting threads

    bus.publish(ActionMessage(message=f"Threads: {old}{new}"))

Submit-on-Demand Pattern

from collections import deque

pending = deque(files_to_process)
in_flight = {}  # future -> VideoFile

def submit_batch():
    """Submit files up to max_inflight limit"""
    max_inflight = config.general.prefetch_factor * current_max_threads
    while len(in_flight) < max_inflight and pending:
        vf = pending.popleft()
        future = executor.submit(_process_file, vf, input_dir)
        in_flight[future] = vf

    # Update UI with pending files
    bus.publish(QueueUpdated(pending_files=list(pending)))

# Submit initial batch
submit_batch()

# Process as they complete
while in_flight:
    done, _ = wait(in_flight, timeout=1.0, return_when=FIRST_COMPLETED)

    for future in done:
        future.result()
        del in_flight[future]

    # Replenish queue
    submit_batch()

Metadata Caching

# Thread-safe cache to avoid redundant ExifTool calls
_metadata_cache: Dict[Path, VideoMetadata] = {}
_metadata_lock = threading.Lock()

def _get_metadata(video_file: VideoFile) -> VideoMetadata:
    with self._metadata_lock:
        cached = self._metadata_cache.get(video_file.path)
        if cached:
            return cached

    # Extract metadata
    stream_info = ffprobe_adapter.get_stream_info(video_file.path)
    metadata = _build_metadata(video_file, stream_info)

    # Cache it
    with self._metadata_lock:
        self._metadata_cache[video_file.path] = metadata

    return metadata

Decision Logic

Dynamic Quality

def _determine_cq(self, file: VideoFile) -> int:
    use_gpu = self.config.general.gpu
    encoder_args = select_encoder_args(self.config, use_gpu)
    default_cq = extract_quality_value(encoder_args) or (45 if use_gpu else 32)

    if not file.metadata:
        return default_cq

    # Check for custom CQ from ExifTool
    if file.metadata.custom_cq is not None:
        return file.metadata.custom_cq

    # Check dynamic_quality mapping
    if file.metadata.camera_model:
        for pattern, rule in self.config.general.dynamic_quality.items():
            if pattern in file.metadata.camera_model:
                return rule.cq

    return default_cq

Auto-Rotation

def _determine_rotation(self, file: VideoFile) -> Optional[int]:
    # Manual rotation overrides all
    if self.config.general.manual_rotation is not None:
        return self.config.general.manual_rotation

    # Check filename patterns
    filename = file.path.name
    for pattern, angle in self.config.autorotate.patterns.items():
        if re.search(pattern, filename):
            return angle

    return None

Graceful Shutdown

# User presses 'S' key
def _on_shutdown_request(self, event: RequestShutdown):
    with self._thread_lock:
        self._shutdown_requested = True
        self._thread_lock.notify_all()  # Wake up all waiting threads

    bus.publish(ActionMessage(message="SHUTDOWN requested"))

# In main loop
while in_flight:
    done, _ = wait(in_flight, timeout=1.0)
    # ...

    # Exit if shutdown and no more in flight
    if self._shutdown_requested and not in_flight:
        logger.info("Shutdown complete")
        break

Refresh Queue

# User presses 'R' key
def _on_refresh_request(self, event: RefreshRequested):
    with self._refresh_lock:
        self._refresh_requested = True

# In main loop
if self._refresh_requested:
    self._refresh_requested = False

    # Re-scan directory
    new_files, new_stats = _perform_discovery(input_dir)

    # Add only new files (not already submitted)
    submitted_paths = {vf.path for vf in in_flight.values()}
    submitted_paths.update(vf.path for vf in pending)

    added = 0
    for vf in new_files:
        if vf.path not in submitted_paths:
            pending.append(vf)
            added += 1

    # Update stats
    bus.publish(DiscoveryFinished(...))
    bus.publish(ActionMessage(message=f"Refreshed: +{added} new files"))

Error Handling

Corrupted Files

try:
    stream_info = ffprobe_adapter.get_stream_info(video_file.path)
except Exception as e:
    # ffprobe failed - file is corrupted
    err_path.write_text("File is corrupted (ffprobe failed)")
    logger.error(f"Corrupted file: {filename}")
    job.status = JobStatus.FAILED
    bus.publish(JobFailed(job=job, error_message="Corrupted file"))
    return

Hardware Capability

# FFmpegAdapter detects error and sets status
if job.status == JobStatus.HW_CAP_LIMIT:
    # Write .err marker
    err_path.write_text("Hardware is lacking required capabilities")
    # Event already published by FFmpegAdapter

Color Space Fix

input_path, temp_fixed = _check_and_fix_color_space(
    video_file.path,
    output_path,
    stream_info
)

# Use temp_fixed if color space was remuxed
ffmpeg_adapter.compress(job, config, input_path=input_path)

# Cleanup temp file
if temp_fixed and temp_fixed.exists():
    temp_fixed.unlink()