Skip to content

Infrastructure API

This page documents the infrastructure adapters that interact with external systems.

Event Bus

Synchronous Pub/Sub event system for decoupled communication.

event_bus

EventBus

EventBus()

A simple synchronous event bus for decoupled communication.

Source code in vbc/infrastructure/event_bus.py
def __init__(self):
    self._subscribers: Dict[Type[Event], List[Callable[[Any], None]]] = {}
subscribe
subscribe(event_type: Type[Event], callback: Optional[Callable[[Any], None]] = None)

Subscribes a callback to a specific event type. Can be used as a decorator.

Source code in vbc/infrastructure/event_bus.py
def subscribe(self, event_type: Type[Event], callback: Optional[Callable[[Any], None]] = None):
    """Subscribes a callback to a specific event type. Can be used as a decorator."""
    if callback is None:
        def decorator(func: Callable[[Any], None]):
            self.subscribe(event_type, func)
            return func
        return decorator

    if event_type not in self._subscribers:
        self._subscribers[event_type] = []
    self._subscribers[event_type].append(callback)
publish
publish(event: Event)

Publishes an event to all interested subscribers.

Source code in vbc/infrastructure/event_bus.py
def publish(self, event: Event):
    """Publishes an event to all interested subscribers."""
    event_type = type(event)
    if event_type in self._subscribers:
        for callback in self._subscribers[event_type]:
            callback(event)

File Scanner

Recursive directory scanner with filtering.

file_scanner

FileScanner

FileScanner(extensions: List[str], min_size_bytes: int = 0)

Recursively scans for video files in a directory.

Source code in vbc/infrastructure/file_scanner.py
def __init__(self, extensions: List[str], min_size_bytes: int = 0):
    self.extensions = [(ext if ext.startswith(".") else f".{ext}").lower() for ext in extensions]
    self.min_size_bytes = min_size_bytes
scan
scan(root_dir: Path) -> Generator[VideoFile, None, None]

Scans the directory and yields VideoFile objects.

Source code in vbc/infrastructure/file_scanner.py
def scan(self, root_dir: Path) -> Generator[VideoFile, None, None]:
    """Scans the directory and yields VideoFile objects."""
    for root, dirs, files in os.walk(str(root_dir)):
        root_path = Path(root)

        # Skip output directories (ending in _out)
        if root_path.name.endswith("_out"):
            dirs[:] = [] # stop recursion into this branch
            continue

        # Ensure deterministic traversal: sort directories and files
        dirs[:] = sorted(d for d in dirs if not d.endswith("_out"))
        files.sort()

        for file_name in files:
            file_path = root_path / file_name

            # Check extension
            if file_path.suffix.lower() not in self.extensions:
                continue

            # Check size
            try:
                file_size = file_path.stat().st_size
                if file_size < self.min_size_bytes:
                    continue

                yield VideoFile(path=file_path, size_bytes=file_size)
            except OSError:
                # Skip files we can't access
                continue

ExifTool Adapter

Wrapper around pyexiftool for metadata extraction.

exif_tool

ExifToolAdapter

ExifToolAdapter()

Wrapper around pyexiftool for metadata extraction and manipulation.

Source code in vbc/infrastructure/exif_tool.py
def __init__(self):
    self.et = exiftool.ExifTool()
    self._lock = threading.Lock()
extract_tags
extract_tags(file_path: Path) -> Dict[str, Any]

Extract raw ExifTool tags as a dictionary for verification checks.

Source code in vbc/infrastructure/exif_tool.py
def extract_tags(self, file_path: Path) -> Dict[str, Any]:
    """Extract raw ExifTool tags as a dictionary for verification checks."""
    if not self.et.running:
        self.et.run()

    with self._lock:
        metadata_list = self.et.execute_json(str(file_path))
    if not metadata_list:
        raise ValueError(f"Could not extract metadata for {file_path}")
    return metadata_list[0]
extract_metadata
extract_metadata(file: VideoFile) -> VideoMetadata

Extracts metadata from a video file using ExifTool.

Source code in vbc/infrastructure/exif_tool.py
def extract_metadata(self, file: VideoFile) -> VideoMetadata:
    """Extracts metadata from a video file using ExifTool."""
    if not self.et.running:
        self.et.run()

    with self._lock:
        metadata_list = self.et.execute_json(str(file.path))
    if not metadata_list:
        raise ValueError(f"Could not extract metadata for {file.path}")

    data = metadata_list[0]

    width = self._get_tag(data, ["QuickTime:ImageWidth", "Track1:ImageWidth", "ImageWidth"])
    height = self._get_tag(data, ["QuickTime:ImageHeight", "Track1:ImageHeight", "ImageHeight"])
    fps = self._get_tag(data, ["QuickTime:VideoFrameRate", "VideoFrameRate"])
    # Get video codec ID (avc1=h264, hvc1=hevc, etc), not HandlerDescription which can be "Sound"
    codec_raw = self._get_tag(data, ["QuickTime:CompressorID", "CompressorID", "VideoCodec", "CompressorName"])

    # Map codec IDs to user-friendly names
    codec_map = {
        "avc1": "h264",
        "hvc1": "hevc",
        "hev1": "hevc",
        "av01": "av1",
        "vp09": "vp9",
        "vp08": "vp8"
    }
    codec = codec_map.get(str(codec_raw).lower(), str(codec_raw)) if codec_raw else "unknown"

    camera = self._extract_camera_raw(data)
    bitrate = self._get_tag(data, ["QuickTime:AvgBitrate", "AvgBitrate"])

    return VideoMetadata(
        width=int(width) if width else 0,
        height=int(height) if height else 0,
        codec=codec,
        fps=float(fps) if fps else 0.0,
        camera_model=str(camera) if camera else None,
        bitrate_kbps=float(bitrate) / 1000 if bitrate else None
    )
extract_exif_info
extract_exif_info(file: VideoFile, dynamic_quality: Dict[str, DynamicQualityRule]) -> Dict[str, Optional[object]]

Extracts camera info and dynamic quality using full ExifTool tags.

Source code in vbc/infrastructure/exif_tool.py
def extract_exif_info(
    self,
    file: VideoFile,
    dynamic_quality: Dict[str, DynamicQualityRule],
) -> Dict[str, Optional[object]]:
    """Extracts camera info and dynamic quality using full ExifTool tags."""
    if not self.et.running:
        self.et.run()

    with self._lock:
        metadata_list = self.et.execute_json(str(file.path))
    if not metadata_list:
        raise ValueError(f"Could not extract metadata for {file.path}")

    tags = metadata_list[0]

    # Build a searchable text from tag values only
    # This keeps searching in the 'whole exif' but avoids matching dict keys
    full_metadata_text = " ".join(str(v) for v in tags.values())

    camera_raw = self._extract_camera_raw(tags)

    camera_model = None
    custom_cq = None
    matched_pattern = None

    def _rule_cq(rule: Any) -> Optional[int]:
        if isinstance(rule, DynamicQualityRule):
            return rule.cq
        if isinstance(rule, dict):
            cq_value = rule.get("cq")
            if isinstance(cq_value, int):
                return cq_value
        return None

    # 1. Prioritize matching against the extracted camera model/make
    if camera_raw:
        for pattern, rule in dynamic_quality.items():
            cq_value = _rule_cq(rule)
            if pattern in camera_raw:
                camera_model = camera_raw
                custom_cq = cq_value
                matched_pattern = pattern
                break

    # 2. Fallback: Search in all exif values
    if custom_cq is None:
        for pattern, rule in dynamic_quality.items():
            cq_value = _rule_cq(rule)
            if pattern in full_metadata_text:
                camera_model = pattern
                custom_cq = cq_value
                matched_pattern = pattern
                break

    if not camera_model and camera_raw:
        camera_model = camera_raw

    # Check for VBC Encoder tag
    vbc_encoded = False
    # ExifTool often returns keys like "XMP:VBCEncoder" or just "VBCEncoder"
    # We check keys in the dict
    for key in tags.keys():
        k_lower = key.lower()
        if "vbcencoder" in k_lower or "vbc encoder" in k_lower:
            vbc_encoded = True
            break

    bitrate = tags.get('QuickTime:AvgBitrate') or tags.get('AvgBitrate')
    bitrate_kbps = float(bitrate) / 1000 if bitrate else None

    return {
        "camera_model": camera_model,
        "camera_raw": camera_raw,
        "custom_cq": custom_cq,
        "bitrate_kbps": bitrate_kbps,
        "matched_pattern": matched_pattern,
        "vbc_encoded": vbc_encoded,
    }
copy_metadata
copy_metadata(source: Path, target: Path)

Copies EXIF/XMP tags from source to target.

Source code in vbc/infrastructure/exif_tool.py
def copy_metadata(self, source: Path, target: Path):
    """Copies EXIF/XMP tags from source to target."""
    if not self.et.running:
        self.et.run()

    # Standard command for deep EXIF/XMP copy
    cmd = [
        "-tagsFromFile", str(source),
        "-all:all",
        "-unsafe",
        "-overwrite_original",
        str(target)
    ]
    self.et.execute(*cmd)

FFprobe Adapter

Wrapper around ffprobe for stream information.

ffprobe

FFprobeAdapter

Wrapper around ffprobe to extract stream information.

get_stream_info
get_stream_info(file_path: Path) -> Dict[str, Any]

Executes ffprobe and parses JSON output.

Source code in vbc/infrastructure/ffprobe.py
def get_stream_info(self, file_path: Path) -> Dict[str, Any]:
    """Executes ffprobe and parses JSON output."""
    cmd = [
        "ffprobe",
        "-v", "error",
        "-print_format", "json",
        "-show_streams",
        "-show_format",
        str(file_path)
    ]
    timeout_s = self._estimate_timeout(file_path)
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout_s)
    except subprocess.TimeoutExpired as exc:
        raise RuntimeError(f"ffprobe timed out after {timeout_s}s for {file_path}") from exc
    if result.returncode != 0:
        err = (result.stderr or "").strip()
        detail = err if err else "unknown error (no stderr)"
        raise RuntimeError(f"ffprobe failed for {file_path}: {detail}")

    data = json.loads(result.stdout)

    # Find streams
    streams = data.get("streams", [])
    video_stream = next((s for s in streams if s.get("codec_type") == "video"), None)
    if not video_stream:
        raise ValueError(f"No video stream found in {file_path}")
    audio_stream = next((s for s in streams if s.get("codec_type") == "audio"), None)
    if audio_stream is None:
        audio_codec = "no-audio"
    else:
        audio_codec = audio_stream.get("codec_name") or "unknown"

    # Parse FPS (prefer avg_frame_rate; r_frame_rate is often timebase)
    fps = 0.0
    fps_str = video_stream.get("avg_frame_rate", "0/0")
    if "/" in fps_str:
        try:
            num, den = map(float, fps_str.split("/"))
            if den != 0:
                candidate = num / den
                if candidate <= 240:
                    fps = round(candidate)
        except ValueError:
            fps = 0.0
    else:
        try:
            candidate = float(fps_str)
            if candidate <= 240:
                fps = round(candidate)
        except ValueError:
            fps = 0.0

    # Duration fallback order: format.duration, format tags, stream.duration, stream tags, duration_ts/time_base, bitrate/size
    fmt = data.get("format", {})
    duration = self._to_float(fmt.get("duration"))
    if duration <= 0:
        tags = fmt.get("tags", {}) or {}
        duration = self._parse_duration_tag(tags.get("DURATION") or tags.get("duration"))
    if duration <= 0:
        duration = self._to_float(video_stream.get("duration"))
    if duration <= 0:
        tags = video_stream.get("tags", {}) or {}
        duration = self._parse_duration_tag(tags.get("DURATION") or tags.get("duration"))
    if duration <= 0:
        duration = self._parse_time_base_duration(video_stream.get("duration_ts"), video_stream.get("time_base"))
    if duration <= 0:
        bit_rate = self._to_float(fmt.get("bit_rate") or video_stream.get("bit_rate"))
        size = self._to_float(fmt.get("size"))
        if bit_rate > 0 and size > 0:
            duration = (size * 8) / bit_rate

    # Check for VBC tags in format or stream tags
    format_tags = fmt.get("tags", {}) or {}
    stream_tags = video_stream.get("tags", {}) or {}

    # Check for VBC Encoder tag (case-insensitive check for key presence)
    vbc_encoded = False
    for tags_dict in (format_tags, stream_tags):
        for k, v in tags_dict.items():
            if k.lower() in ("vbcencoder", "vbc encoder"):
                vbc_encoded = True
                break
        if vbc_encoded:
            break

    bitrate_bps = self._to_float(fmt.get("bit_rate") or video_stream.get("bit_rate"))

    return {
        "width": int(video_stream.get("width", 0)),
        "height": int(video_stream.get("height", 0)),
        "codec": video_stream.get("codec_name", "unknown"),
        "audio_codec": audio_codec,
        "fps": fps,
        "duration": duration,
        "bitrate_kbps": (bitrate_bps / 1000.0) if bitrate_bps > 0 else None,
        "color_space": video_stream.get("color_space"),
        "pix_fmt": video_stream.get("pix_fmt"),
        "vbc_encoded": vbc_encoded,
    }

FFmpeg Adapter

Wrapper around ffmpeg for video compression.

ffmpeg

FFmpeg process wrapper for AV1 video compression.

Handles subprocess lifecycle, progress monitoring, error detection, and recovery. Detects hardware capability errors and color space issues, publishing events on failures.

FFmpegAdapter

FFmpegAdapter(event_bus: EventBus)

Subprocess adapter for FFmpeg video compression.

Manages FFmpeg execution with real-time progress monitoring via stdout parsing. Detects GPU hardware capability exhaustion (exit code 187) and color space bugs in FFmpeg 7.x, triggering automatic recovery via remuxing + retry.

Source code in vbc/infrastructure/ffmpeg.py
def __init__(self, event_bus: EventBus):
    self.event_bus = event_bus
    self.logger = logging.getLogger(__name__)
compress
compress(job: CompressionJob, config: AppConfig, use_gpu: bool, quality: Optional[int] = None, rate_control: Optional[ResolvedRateControl] = None, rotate: Optional[int] = None, shutdown_event=None, input_path: Optional[Path] = None)

Execute AV1 compression via FFmpeg subprocess.

Spawns FFmpeg, monitors stdout for progress updates, detects errors including: - Hardware capability exhaustion (HW_CAP_LIMIT status) - FFmpeg 7.x color space bugs (triggers _apply_color_fix) - Exit code failures

Publishes JobProgressUpdated, JobFailed, and HardwareCapabilityExceeded events. Handles graceful shutdown via shutdown_event (Ctrl+C integration).

Parameters:

Name Type Description Default
job CompressionJob

Compression job to process.

required
config AppConfig

AppConfig with encoder settings and flags.

required
use_gpu bool

Whether the GPU encoder is active.

required
quality Optional[int]

Optional quality override (CQ/CRF) for this job.

None
rate_control Optional[ResolvedRateControl]

Optional bitrate control override for rate mode.

None
rotate Optional[int]

Optional rotation angle (degrees).

None
shutdown_event

Threading.Event to signal interruption.

None
input_path Optional[Path]

Override input path (used for color fix retry).

None
Side Effects
  • Updates job.status, job.error_message, job.duration_seconds
  • Writes .tmp file during processing; renames to output on success
  • Publishes events to EventBus
  • Cleans up .tmp file on error/interruption
Source code in vbc/infrastructure/ffmpeg.py
def compress(
    self,
    job: CompressionJob,
    config: AppConfig,
    use_gpu: bool,
    quality: Optional[int] = None,
    rate_control: Optional[ResolvedRateControl] = None,
    rotate: Optional[int] = None,
    shutdown_event=None,
    input_path: Optional[Path] = None,
):
    """Execute AV1 compression via FFmpeg subprocess.

    Spawns FFmpeg, monitors stdout for progress updates, detects errors including:
    - Hardware capability exhaustion (HW_CAP_LIMIT status)
    - FFmpeg 7.x color space bugs (triggers _apply_color_fix)
    - Exit code failures

    Publishes JobProgressUpdated, JobFailed, and HardwareCapabilityExceeded events.
    Handles graceful shutdown via shutdown_event (Ctrl+C integration).

    Args:
        job: Compression job to process.
        config: AppConfig with encoder settings and flags.
        use_gpu: Whether the GPU encoder is active.
        quality: Optional quality override (CQ/CRF) for this job.
        rate_control: Optional bitrate control override for rate mode.
        rotate: Optional rotation angle (degrees).
        shutdown_event: Threading.Event to signal interruption.
        input_path: Override input path (used for color fix retry).

    Side Effects:
        - Updates job.status, job.error_message, job.duration_seconds
        - Writes .tmp file during processing; renames to output on success
        - Publishes events to EventBus
        - Cleans up .tmp file on error/interruption
    """
    filename = job.source_file.path.name
    start_time = time.monotonic() if config.general.debug else None

    encoder_args = select_encoder_args(config, use_gpu)
    if config.general.quality_mode == "rate":
        if rate_control is None:
            raise ValueError("Missing resolved rate control for quality_mode=rate.")
        encoder_args = apply_rate_control_args(
            encoder_args,
            use_gpu=use_gpu,
            rate_control=rate_control,
        )
    elif quality is not None:
        encoder_args = replace_quality_value(encoder_args, quality)
    if not use_gpu:
        encoder_args = apply_cpu_thread_overrides(encoder_args, config.general.ffmpeg_cpu_threads)
        if config.cpu_encoder.advanced and config.cpu_encoder.advanced_enforce_input_pix_fmt:
            pix_fmt = None
            if job.source_file.metadata:
                pix_fmt = job.source_file.metadata.pix_fmt
            encoder_args = apply_pix_fmt_arg(encoder_args, pix_fmt)

    if config.general.debug:
        encoder_name = _extract_flag_value(encoder_args, "-c:v") or "unknown"
        if config.general.quality_mode == "rate":
            quality_text = f"RATE={_extract_flag_value(encoder_args, '-b:v')}"
        else:
            quality_value = extract_quality_value(encoder_args)
            quality_flag = extract_quality_flag(encoder_args)
            quality_label = "CQ" if quality_flag == "-cq" else "CRF" if quality_flag == "-crf" else "Q"
            quality_text = f"{quality_label}={quality_value}" if quality_value is not None else "quality=unknown"
        self.logger.info(
            f"FFMPEG_START: {filename} (gpu={use_gpu}, encoder={encoder_name}, {quality_text})"
        )

    if config.general.debug:
        _, audio_mode, audio_codec = self._select_audio_options(job)
        self.logger.info(f"AUDIO_MODE: {filename} mode={audio_mode} codec={audio_codec}")

    cmd = self._build_command(job, config, encoder_args, use_gpu, rotate, input_path=input_path)

    if config.general.debug:
        self.logger.debug(f"FFMPEG_CMD: {' '.join(cmd)}")

    # Use duration for progress calculation
    total_duration = job.source_file.metadata.duration if job.source_file.metadata else 0.0

    process = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True,
        bufsize=1
    )

    # Regex to parse 'time=00:00:00.00' from ffmpeg output
    time_regex = re.compile(r"time=(\d+):(\d+):(\d+\.\d+)")
    hw_cap_error = False
    gpu_unavailable_error = False
    gpu_unavailable_detail: Optional[str] = None
    color_error = False

    output_queue: "queue.Queue[Optional[str]]" = queue.Queue()

    def _reader():
        if not process.stdout:
            output_queue.put(None)
            return
        for line in process.stdout:
            output_queue.put(line)
        output_queue.put(None)

    reader_thread = threading.Thread(target=_reader, daemon=True)
    reader_thread.start()

    try:
        while True:
            # Check for shutdown signal from orchestrator
            if shutdown_event and shutdown_event.is_set():
                self.logger.info(f"FFMPEG_INTERRUPTED: {filename} (shutdown signal)")
                process.terminate()
                try:
                    process.wait(timeout=3)
                except subprocess.TimeoutExpired:
                    process.kill()
                    process.wait()

                # Clean up tmp file
                tmp_path = job.output_path.with_suffix('.tmp')
                if tmp_path.exists():
                    tmp_path.unlink()

                # Set INTERRUPTED status and return (don't raise exception)
                job.status = JobStatus.INTERRUPTED
                job.error_message = "Interrupted by user (Ctrl+C)"
                return  # Exit compress() early

            try:
                line = output_queue.get(timeout=0.1)
            except queue.Empty:
                if process.poll() is not None:
                    break
                continue

            if line is None:
                break

            line_stripped = line.strip()
            line_lower = line.lower()

            if (
                "Hardware is lacking required capabilities" in line
                or "No capable devices found" in line
                or "not supported" in line and "nvenc" in line.lower()
            ):
                hw_cap_error = True
            if use_gpu:
                is_gpu_unavailable = (
                    "cuda_error_no_device" in line_lower
                    or "no cuda-capable device is detected" in line_lower
                    or "no nvenc capable devices found" in line_lower
                    or "cannot load libcuda" in line_lower
                    or "driver does not support the required nvenc api version" in line_lower
                    or "openencodesessionex failed" in line_lower
                )
                if is_gpu_unavailable:
                    gpu_unavailable_error = True
                    hw_cap_error = True
                    if gpu_unavailable_detail is None and line_stripped:
                        gpu_unavailable_detail = line_stripped
            if "is not a valid value for color_primaries" in line or "is not a valid value for color_trc" in line:
                color_error = True

            match = time_regex.search(line)
            if match:
                h, m, s = map(float, match.groups())
                current_seconds = h * 3600 + m * 60 + s
                if total_duration > 0:
                    progress_percent = min(100.0, (current_seconds / total_duration) * 100.0)
                    self.event_bus.publish(JobProgressUpdated(job=job, progress_percent=progress_percent))

        process.wait()
    except KeyboardInterrupt:
        # User pressed Ctrl+C directly in this thread (shouldn't happen with daemon threads)
        self.logger.info(f"FFMPEG_INTERRUPTED: {filename} (KeyboardInterrupt)")
        process.terminate()
        try:
            process.wait(timeout=3)
        except subprocess.TimeoutExpired:
            process.kill()
            process.wait()

        # Clean up tmp file
        tmp_path = job.output_path.with_suffix('.tmp')
        if tmp_path.exists():
            tmp_path.unlink()

        # Set status and re-raise to propagate to orchestrator
        job.status = JobStatus.INTERRUPTED
        job.error_message = "Interrupted by user (Ctrl+C)"
        raise

    # Get tmp file path
    tmp_path = job.output_path.with_suffix('.tmp')

    # Check for hardware capability error (code 187 or text match)
    if hw_cap_error or process.returncode == 187:
        job.status = JobStatus.HW_CAP_LIMIT
        if gpu_unavailable_error:
            detail = gpu_unavailable_detail or "GPU encoder initialization failed"
            job.error_message = (
                f"GPU AV1 encode unavailable: {detail}. "
                "Use --cpu or enable cpu_fallback."
            )
        else:
            job.error_message = "Hardware is lacking required capabilities"
        # Cleanup tmp file on error
        if tmp_path.exists():
            tmp_path.unlink()
        self.event_bus.publish(HardwareCapabilityExceeded(job=job))
        if config.general.debug and start_time:
            elapsed = time.monotonic() - start_time
            self.logger.info(f"FFMPEG_END: {filename} status=hw_cap_limit elapsed={elapsed:.2f}s")
    elif color_error:
        # Re-run with color fix remux (recursive call sets final status)
        if config.general.debug:
            self.logger.info(f"FFMPEG_COLORFIX: {filename} (applying color space fix)")
        self._apply_color_fix(
            job,
            config,
            use_gpu,
            quality,
            rate_control,
            rotate,
            shutdown_event=shutdown_event,
        )
        # Status is now set by recursive compress() call, don't override
        if config.general.debug and start_time:
            elapsed = time.monotonic() - start_time
            self.logger.info(f"FFMPEG_END: {filename} status={job.status.value} elapsed={elapsed:.2f}s (with colorfix)")
    elif process.returncode != 0:
        job.status = JobStatus.FAILED
        job.error_message = f"ffmpeg exited with code {process.returncode}"
        # Cleanup tmp file on error
        if tmp_path.exists():
            tmp_path.unlink()
        self.event_bus.publish(JobFailed(job=job, error_message=job.error_message))
        if config.general.debug and start_time:
            elapsed = time.monotonic() - start_time
            self.logger.info(f"FFMPEG_END: {filename} status=failed code={process.returncode} elapsed={elapsed:.2f}s")
    else:
        # Success only when tmp exists and can be atomically renamed to final output.
        if not tmp_path.exists():
            job.status = JobStatus.FAILED
            job.error_message = "ffmpeg succeeded but temporary output file is missing"
            self.event_bus.publish(JobFailed(job=job, error_message=job.error_message))
            if config.general.debug and start_time:
                elapsed = time.monotonic() - start_time
                self.logger.info(
                    f"FFMPEG_END: {filename} status=failed reason=missing_tmp elapsed={elapsed:.2f}s"
                )
            return
        try:
            tmp_path.rename(job.output_path)
        except OSError as exc:
            job.status = JobStatus.FAILED
            job.error_message = f"ffmpeg succeeded but failed to finalize output: {exc}"
            self.event_bus.publish(JobFailed(job=job, error_message=job.error_message))
            # Best effort cleanup of orphan tmp.
            if tmp_path.exists():
                tmp_path.unlink()
            if config.general.debug and start_time:
                elapsed = time.monotonic() - start_time
                self.logger.info(
                    f"FFMPEG_END: {filename} status=failed reason=rename_error elapsed={elapsed:.2f}s"
                )
            return
        job.status = JobStatus.COMPLETED
        if config.general.debug and start_time:
            elapsed = time.monotonic() - start_time
            self.logger.info(f"FFMPEG_END: {filename} status=completed elapsed={elapsed:.2f}s")

Housekeeping Service

Cleanup service for temporary files and error markers.

housekeeping

HousekeepingService

Service for cleaning up temporary files and error markers.

cleanup_output_markers
cleanup_output_markers(input_dir: Path, output_dir: Path, errors_dir: Path, clean_errors: bool, logger: Optional[Logger] = None) -> None

Cleanup .tmp (always) and .err (only if clean_errors) in output_dir.

If a marker has a corresponding source file in input_dir, delete it. Otherwise, move it to errors_dir and log a warning.

Source code in vbc/infrastructure/housekeeping.py
def cleanup_output_markers(
    self,
    input_dir: Path,
    output_dir: Path,
    errors_dir: Path,
    clean_errors: bool,
    logger: Optional[logging.Logger] = None,
) -> None:
    """Cleanup .tmp (always) and .err (only if clean_errors) in output_dir.

    If a marker has a corresponding source file in input_dir, delete it.
    Otherwise, move it to errors_dir and log a warning.
    """
    if not output_dir.exists():
        return

    markers = []
    for root, _dirs, files in os.walk(output_dir):
        for file in files:
            if file.endswith(".tmp") or (clean_errors and file.endswith(".err")):
                markers.append(Path(root) / file)

    if not markers:
        return

    errors_dir.mkdir(parents=True, exist_ok=True)

    for marker in markers:
        try:
            rel_marker = marker.relative_to(output_dir)
        except ValueError:
            rel_marker = Path(marker.name)
        source = self._find_source_for_marker(input_dir, rel_marker)
        if source and source.exists():
            try:
                marker.unlink()
            except OSError:
                pass
            continue

        dest = errors_dir / rel_marker
        dest.parent.mkdir(parents=True, exist_ok=True)
        try:
            shutil.move(str(marker), str(dest))
        except OSError:
            continue
        if logger:
            logger.warning(
                f"Moved stale marker without source file: {marker} -> {dest}"
            )

Web Server

Read-only HTMX dashboard server and template rendering helpers.

web_server

Read-only HTMX web dashboard for VBC.

Serves a single-page dashboard that auto-refreshes via HTMX polling every 2s. Runs as a daemon thread — stops automatically when VBC exits.

No new dependencies: uses stdlib http.server + socketserver only. HTMX 2.0.8 and Pico.css 2.1.1 loaded from jsDelivr CDN. Static files (style.css, theme-switcher.js) served from vbc/infrastructure/web/. HTML fragments rendered via Jinja2 templates in vbc/infrastructure/web/templates/.

VBCRequestHandler

Bases: BaseHTTPRequestHandler

HTTP request handler for VBC web dashboard.

Class attribute state is set by VBCWebServer before the server starts.

log_message
log_message(format: str, *args) -> None

Suppress default access log to keep VBC terminal clean.

Source code in vbc/infrastructure/web_server.py
def log_message(self, format: str, *args) -> None:  # noqa: A002
    """Suppress default access log to keep VBC terminal clean."""

VBCWebServer

VBCWebServer(state: 'UIState', port: int = DEFAULT_PORT, host: str = '0.0.0.0')

Read-only HTMX web dashboard server for VBC.

Runs as a daemon thread — stops automatically when VBC process exits.

Usage::

server = VBCWebServer(state=ui_state, port=8765)
server.start()   # non-blocking, prints URL
# ... VBC runs ...
server.stop()    # optional; daemon thread auto-stops on exit
Source code in vbc/infrastructure/web_server.py
def __init__(self, state: "UIState", port: int = DEFAULT_PORT, host: str = "0.0.0.0") -> None:
    self.state = state
    self.port = port
    self.host = host
    self._server: Optional[_ThreadingHTTPServer] = None
    self._thread: Optional[threading.Thread] = None
start
start() -> None

Start web server in a daemon background thread.

Source code in vbc/infrastructure/web_server.py
def start(self) -> None:
    """Start web server in a daemon background thread."""
    VBCRequestHandler.state = self.state  # inject shared state
    try:
        self._server = _ThreadingHTTPServer((self.host, self.port), VBCRequestHandler)
    except OSError as exc:
        logger.warning("Web dashboard: could not bind to %s:%d: %s", self.host, self.port, exc)
        print(f"[VBC] Web dashboard: {self.host}:{self.port} unavailable — dashboard disabled.")
        return

    self._thread = threading.Thread(
        target=self._server.serve_forever,
        name="vbc-web-dashboard",
        daemon=True,
    )
    self._thread.start()
    display_host = "localhost" if self.host in ("0.0.0.0", "::") else self.host
    logger.info("Web dashboard: http://%s:%d/", display_host, self.port)
    print(f"[VBC] Web dashboard: http://{display_host}:{self.port}/")
stop
stop() -> None

Gracefully stop the web server.

Source code in vbc/infrastructure/web_server.py
def stop(self) -> None:
    """Gracefully stop the web server."""
    if self._server:
        self._server.shutdown()
        self._server = None
    if self._thread:
        self._thread.join(timeout=2.0)
        self._thread = None

GPU Monitor

Background GPU metrics sampler for dashboard sparklines.

gpu_monitor

GpuMonitor

GpuMonitor(state: UIState, refresh_rate: int = 5, device_index: int = 0, device_name: Optional[str] = None, nvtop_path: Optional[str] = None)

Monitors GPU metrics using nvtop -s in a background thread.

Source code in vbc/infrastructure/gpu_monitor.py
def __init__(self, state: UIState, refresh_rate: int = 5,
             device_index: int = 0, device_name: Optional[str] = None,
             nvtop_path: Optional[str] = None):
    self.state = state
    self.refresh_rate = refresh_rate
    self.device_index = device_index
    self.device_name = device_name
    self.logger = logging.getLogger(__name__)
    self._stop_event = threading.Event()
    self._thread: Optional[threading.Thread] = None

    # Determine nvtop binary path
    if nvtop_path:
        self.nvtop_cmd = nvtop_path
        self._nvtop_available = os.path.isfile(nvtop_path) and os.access(nvtop_path, os.X_OK)
    else:
        nvtop_which = shutil.which("nvtop")
        self.nvtop_cmd = nvtop_which or "nvtop"
        self._nvtop_available = nvtop_which is not None
start
start()

Starts the monitoring thread.

Source code in vbc/infrastructure/gpu_monitor.py
def start(self):
    """Starts the monitoring thread."""
    if self._thread is not None:
        return

    if not self._nvtop_available:
        self.logger.info("GPU Monitor started (nvtop not available, GPU metrics disabled)")
        return

    self._stop_event.clear()
    self._thread = threading.Thread(target=self._poll, daemon=True)
    self._thread.start()
    self.logger.info("GPU Monitor started")
stop
stop()

Stops the monitoring thread.

Source code in vbc/infrastructure/gpu_monitor.py
def stop(self):
    """Stops the monitoring thread."""
    self._stop_event.set()
    if self._thread:
        self._thread.join(timeout=1.0)
        self._thread = None
        self.logger.info("GPU Monitor stopped")

parse_number

parse_number(s: str) -> Optional[float]

Parse number from string like '52C', '30%', '112W'.

Source code in vbc/infrastructure/gpu_monitor.py
def parse_number(s: str) -> Optional[float]:
    """Parse number from string like '52C', '30%', '112W'."""
    if not s:
        return None
    s = str(s).strip()
    if s in {"N/A", "--", "??"}:
        return None
    m = NUM_RE.search(s)
    return float(m.group(1)) if m else None

parse_temp

parse_temp(s: str) -> Optional[float]

'52C' → 52.0, '??' → None

Source code in vbc/infrastructure/gpu_monitor.py
def parse_temp(s: str) -> Optional[float]:
    """'52C' → 52.0, '??' → None"""
    return parse_number(s)

parse_percent

parse_percent(s: str) -> Optional[float]

'30%' → 30.0

Source code in vbc/infrastructure/gpu_monitor.py
def parse_percent(s: str) -> Optional[float]:
    """'30%' → 30.0"""
    return parse_number(s)

parse_watts

parse_watts(s: str) -> Optional[float]

'112W' → 112.0

Source code in vbc/infrastructure/gpu_monitor.py
def parse_watts(s: str) -> Optional[float]:
    """'112W' → 112.0"""
    return parse_number(s)

parse_fan_speed

parse_fan_speed(s: str) -> Optional[float]

'35%' → 35.0, 'CPU Fan' → None, 'N/A' → None

Source code in vbc/infrastructure/gpu_monitor.py
def parse_fan_speed(s: str) -> Optional[float]:
    """'35%' → 35.0, 'CPU Fan' → None, 'N/A' → None"""
    result = parse_number(s)
    # If we got a valid number, return it
    if result is not None:
        return result
    # If it's a non-empty string without a number, still return None
    # (the UI will handle displaying "-" for None values)
    return None

ExifTool Temp Files

Cleanup helpers for stale ExifTool temporary files.

exiftool_tmp

exiftool_tmp_path

exiftool_tmp_path(target_path: Path) -> Path

Return ExifTool's temporary write path for a target file.

Source code in vbc/infrastructure/exiftool_tmp.py
def exiftool_tmp_path(target_path: Path) -> Path:
    """Return ExifTool's temporary write path for a target file."""
    return target_path.with_name(f"{target_path.name}{EXIFTOOL_TMP_SUFFIX}")

remove_exiftool_tmp_for_target

remove_exiftool_tmp_for_target(target_path: Path, logger: Optional[Logger] = None) -> Optional[Path]

Remove a stale ExifTool temp file for a single target if present.

Source code in vbc/infrastructure/exiftool_tmp.py
def remove_exiftool_tmp_for_target(
    target_path: Path,
    logger: Optional[logging.Logger] = None,
) -> Optional[Path]:
    """Remove a stale ExifTool temp file for a single target if present."""
    tmp_path = exiftool_tmp_path(target_path)
    if not os.path.lexists(tmp_path):
        return None
    if tmp_path.is_dir() and not tmp_path.is_symlink():
        if logger:
            logger.warning(f"Refusing to remove ExifTool temp directory: {tmp_path}")
        return None
    tmp_path.unlink()
    if logger:
        logger.warning(f"Removed stale ExifTool temp file: {tmp_path}")
    return tmp_path

cleanup_exiftool_tmp_files

cleanup_exiftool_tmp_files(roots: Iterable[Path], logger: Optional[Logger] = None) -> List[Path]

Remove stale ExifTool temp files below output roots.

Source code in vbc/infrastructure/exiftool_tmp.py
def cleanup_exiftool_tmp_files(
    roots: Iterable[Path],
    logger: Optional[logging.Logger] = None,
) -> List[Path]:
    """Remove stale ExifTool temp files below output roots."""
    removed: List[Path] = []
    seen_roots = set()
    for root in roots:
        root = Path(root)
        if root in seen_roots:
            continue
        seen_roots.add(root)
        if not root.exists():
            continue
        for tmp_path in root.rglob(f"*{EXIFTOOL_TMP_SUFFIX}"):
            if tmp_path.is_dir() and not tmp_path.is_symlink():
                if logger:
                    logger.warning(f"Refusing to remove ExifTool temp directory: {tmp_path}")
                continue
            if not tmp_path.is_file() and not tmp_path.is_symlink():
                continue
            tmp_path.unlink()
            removed.append(tmp_path)
            if logger:
                logger.warning(f"Removed stale ExifTool temp file: {tmp_path}")
    return removed

Logging

Logging configuration and setup.

logging

setup_logging

setup_logging(output_dir: Path, debug: bool = False, log_path: Optional[Path] = None) -> logging.Logger

Setup logging configuration for VBC.

Creates output directory and compression.log file. Returns configured logger instance.

Parameters:

Name Type Description Default
output_dir Path

Directory where output files are written

required
debug bool

If True, enable DEBUG level logging with detailed timings

False
log_path Optional[Path]

Optional path to log file (overrides output_dir)

None
Source code in vbc/infrastructure/logging.py
def setup_logging(output_dir: Path, debug: bool = False, log_path: Optional[Path] = None) -> logging.Logger:
    """
    Setup logging configuration for VBC.

    Creates output directory and compression.log file.
    Returns configured logger instance.

    Args:
        output_dir: Directory where output files are written
        debug: If True, enable DEBUG level logging with detailed timings
        log_path: Optional path to log file (overrides output_dir)
    """
    # Create output directory
    output_dir.mkdir(exist_ok=True)

    # Setup log file
    log_file = Path(log_path) if log_path else (output_dir / "compression.log")
    log_file.parent.mkdir(parents=True, exist_ok=True)

    # Configure logging level
    level = logging.DEBUG if debug else logging.INFO

    # Configure logging
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[logging.FileHandler(log_file)],
        force=True  # Override any existing configuration
    )

    logger = logging.getLogger(__name__)
    logger.info(f"Logging initialized: {log_file} (debug={'ON' if debug else 'OFF'})")

    return logger

Usage Examples

EventBus

from vbc.infrastructure.event_bus import EventBus
from vbc.domain.events import Event
from pydantic import BaseModel

# Create custom event
class CustomEvent(Event):
    message: str

# Create bus
bus = EventBus()

# Subscribe
def handler(event: CustomEvent):
    print(f"Received: {event.message}")

bus.subscribe(CustomEvent, handler)

# Publish
bus.publish(CustomEvent(message="Hello!"))
# Output: Received: Hello!

FileScanner

from pathlib import Path
from vbc.infrastructure.file_scanner import FileScanner

# Create scanner
scanner = FileScanner(
    extensions=[".mp4", ".mov", ".avi"],
    min_size_bytes=1024 * 1024  # 1 MiB
)

# Scan directory
for video_file in scanner.scan(Path("/videos")):
    print(f"{video_file.path.name}: {video_file.size_bytes} bytes")

ExifToolAdapter

from vbc.infrastructure.exif_tool import ExifToolAdapter
from vbc.domain.models import VideoFile
from pathlib import Path

# Create adapter
exif = ExifToolAdapter()
exif.et.run()  # Start ExifTool process

# Extract metadata
video = VideoFile(path=Path("video.mp4"), size_bytes=1000000)
metadata = exif.extract_metadata(video)

print(f"Camera: {metadata.camera_model}")
print(f"Resolution: {metadata.width}x{metadata.height}")
print(f"Codec: {metadata.codec}")

# Extract EXIF info with dynamic quality
dynamic_quality = {
    "ILCE-7RM5": {"cq": 38, "rate": {"bps": "0.8", "minrate": "0.7", "maxrate": "0.9"}},
    "DC-GH7": {"cq": 40},
}
exif_info = exif.extract_exif_info(video, dynamic_quality)

if exif_info["custom_cq"]:
    print(f"Using custom CQ: {exif_info['custom_cq']}")

# Copy metadata
exif.copy_metadata(
    source=Path("source.mp4"),
    target=Path("output.mp4")
)

# Cleanup
exif.et.terminate()

FFprobeAdapter

from vbc.infrastructure.ffprobe import FFprobeAdapter
from pathlib import Path

# Create adapter
ffprobe = FFprobeAdapter()

# Get stream info
info = ffprobe.get_stream_info(Path("video.mp4"))

print(f"Codec: {info['codec']}")
print(f"Resolution: {info['width']}x{info['height']}")
print(f"FPS: {info['fps']}")
print(f"Color space: {info['color_space']}")
print(f"Duration: {info['duration']} seconds")

FFmpegAdapter

from vbc.infrastructure.ffmpeg import FFmpegAdapter
from vbc.infrastructure.event_bus import EventBus
from vbc.domain.models import CompressionJob, VideoFile, JobStatus
from vbc.config.models import AppConfig, GeneralConfig
from pathlib import Path
import threading

# Create adapter
bus = EventBus()
ffmpeg = FFmpegAdapter(event_bus=bus)

# Create job
job = CompressionJob(
    source_file=VideoFile(
        path=Path("input.mp4"),
        size_bytes=100000000
    ),
    output_path=Path("output.mp4"),
    rotation_angle=180
)

# Create config
config = AppConfig(
    general=GeneralConfig(threads=4, gpu=True, copy_metadata=True)
)

# Compress
shutdown_event = threading.Event()
ffmpeg.compress(
    job=job,
    config=config,
    use_gpu=config.general.gpu,
    rotate=180,
    shutdown_event=shutdown_event
)

# Check status
if job.status == JobStatus.COMPLETED:
    print(f"Success! Output: {job.output_path}")
elif job.status == JobStatus.HW_CAP_LIMIT:
    print("Hardware capability exceeded")
elif job.status == JobStatus.FAILED:
    print(f"Failed: {job.error_message}")

HousekeepingService

from vbc.infrastructure.housekeeping import HousekeepingService
from pathlib import Path

# Create service
housekeeper = HousekeepingService()

# Cleanup markers in output dir
housekeeper.cleanup_output_markers(
    input_dir=Path("/videos"),
    output_dir=Path("/videos_out"),
    errors_dir=Path("/videos_err"),
    clean_errors=True,   # True: cleanup .tmp and .err, False: only .tmp
)
# Removes or relocates markers:
# - *.tmp always
# - *.err only when clean_errors=True

Adapter Patterns

Dependency Injection

All adapters are injected into the Orchestrator:

from pathlib import Path
from vbc.pipeline.orchestrator import Orchestrator
from vbc.infrastructure.file_scanner import FileScanner
from vbc.infrastructure.exif_tool import ExifToolAdapter
from vbc.infrastructure.ffprobe import FFprobeAdapter
from vbc.infrastructure.ffmpeg import FFmpegAdapter
from vbc.infrastructure.event_bus import EventBus
from vbc.config.loader import load_config

# Create dependencies
config = load_config(Path("conf/vbc.yaml"))
bus = EventBus()
scanner = FileScanner(
    extensions=config.general.extensions,
    min_size_bytes=config.general.min_size_bytes
)
exif = ExifToolAdapter()
ffprobe = FFprobeAdapter()
ffmpeg = FFmpegAdapter(event_bus=bus)

# Inject into orchestrator
orchestrator = Orchestrator(
    config=config,
    event_bus=bus,
    file_scanner=scanner,
    exif_adapter=exif,
    ffprobe_adapter=ffprobe,
    ffmpeg_adapter=ffmpeg
)

# Run
orchestrator.run(Path("/videos"))

Thread Safety

Adapters that access shared state use locks:

# ExifToolAdapter uses threading.Lock
class ExifToolAdapter:
    def __init__(self):
        self.et = exiftool.ExifTool()
        self._lock = threading.Lock()

    def extract_metadata(self, file: VideoFile):
        with self._lock:
            metadata = self.et.execute_json(str(file.path))
        # ... process metadata

Error Handling

Adapters raise exceptions for invalid inputs:

from vbc.infrastructure.ffprobe import FFprobeAdapter

ffprobe = FFprobeAdapter()

try:
    info = ffprobe.get_stream_info(Path("nonexistent.mp4"))
except Exception as e:
    print(f"ffprobe failed: {e}")
    # Handle corrupted/missing file