Infrastructure API
This page documents the infrastructure adapters that interact with external systems.
Event Bus
Synchronous Pub/Sub event system for decoupled communication.
event_bus
EventBus
A simple synchronous event bus for decoupled communication.
Source code in vbc/infrastructure/event_bus.py
| def __init__(self):
self._subscribers: Dict[Type[Event], List[Callable[[Any], None]]] = {}
|
subscribe
subscribe(event_type: Type[Event], callback: Optional[Callable[[Any], None]] = None)
Subscribes a callback to a specific event type. Can be used as a decorator.
Source code in vbc/infrastructure/event_bus.py
| def subscribe(self, event_type: Type[Event], callback: Optional[Callable[[Any], None]] = None):
"""Subscribes a callback to a specific event type. Can be used as a decorator."""
if callback is None:
def decorator(func: Callable[[Any], None]):
self.subscribe(event_type, func)
return func
return decorator
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(callback)
|
publish
Publishes an event to all interested subscribers.
Source code in vbc/infrastructure/event_bus.py
| def publish(self, event: Event):
"""Publishes an event to all interested subscribers."""
event_type = type(event)
if event_type in self._subscribers:
for callback in self._subscribers[event_type]:
callback(event)
|
File Scanner
Recursive directory scanner with filtering.
file_scanner
FileScanner
FileScanner(extensions: List[str], min_size_bytes: int = 0)
Recursively scans for video files in a directory.
Source code in vbc/infrastructure/file_scanner.py
| def __init__(self, extensions: List[str], min_size_bytes: int = 0):
self.extensions = [(ext if ext.startswith(".") else f".{ext}").lower() for ext in extensions]
self.min_size_bytes = min_size_bytes
|
scan
scan(root_dir: Path) -> Generator[VideoFile, None, None]
Scans the directory and yields VideoFile objects.
Source code in vbc/infrastructure/file_scanner.py
| def scan(self, root_dir: Path) -> Generator[VideoFile, None, None]:
"""Scans the directory and yields VideoFile objects."""
for root, dirs, files in os.walk(str(root_dir)):
root_path = Path(root)
# Skip output directories (ending in _out)
if root_path.name.endswith("_out"):
dirs[:] = [] # stop recursion into this branch
continue
# Ensure deterministic traversal: sort directories and files
dirs[:] = sorted(d for d in dirs if not d.endswith("_out"))
files.sort()
for file_name in files:
file_path = root_path / file_name
# Check extension
if file_path.suffix.lower() not in self.extensions:
continue
# Check size
try:
file_size = file_path.stat().st_size
if file_size < self.min_size_bytes:
continue
yield VideoFile(path=file_path, size_bytes=file_size)
except OSError:
# Skip files we can't access
continue
|
Wrapper around pyexiftool for metadata extraction.
Wrapper around pyexiftool for metadata extraction and manipulation.
Source code in vbc/infrastructure/exif_tool.py
| def __init__(self):
self.et = exiftool.ExifTool()
self._lock = threading.Lock()
|
extract_tags(file_path: Path) -> Dict[str, Any]
Extract raw ExifTool tags as a dictionary for verification checks.
Source code in vbc/infrastructure/exif_tool.py
| def extract_tags(self, file_path: Path) -> Dict[str, Any]:
"""Extract raw ExifTool tags as a dictionary for verification checks."""
if not self.et.running:
self.et.run()
with self._lock:
metadata_list = self.et.execute_json(str(file_path))
if not metadata_list:
raise ValueError(f"Could not extract metadata for {file_path}")
return metadata_list[0]
|
extract_metadata(file: VideoFile) -> VideoMetadata
Extracts metadata from a video file using ExifTool.
Source code in vbc/infrastructure/exif_tool.py
| def extract_metadata(self, file: VideoFile) -> VideoMetadata:
"""Extracts metadata from a video file using ExifTool."""
if not self.et.running:
self.et.run()
with self._lock:
metadata_list = self.et.execute_json(str(file.path))
if not metadata_list:
raise ValueError(f"Could not extract metadata for {file.path}")
data = metadata_list[0]
width = self._get_tag(data, ["QuickTime:ImageWidth", "Track1:ImageWidth", "ImageWidth"])
height = self._get_tag(data, ["QuickTime:ImageHeight", "Track1:ImageHeight", "ImageHeight"])
fps = self._get_tag(data, ["QuickTime:VideoFrameRate", "VideoFrameRate"])
# Get video codec ID (avc1=h264, hvc1=hevc, etc), not HandlerDescription which can be "Sound"
codec_raw = self._get_tag(data, ["QuickTime:CompressorID", "CompressorID", "VideoCodec", "CompressorName"])
# Map codec IDs to user-friendly names
codec_map = {
"avc1": "h264",
"hvc1": "hevc",
"hev1": "hevc",
"av01": "av1",
"vp09": "vp9",
"vp08": "vp8"
}
codec = codec_map.get(str(codec_raw).lower(), str(codec_raw)) if codec_raw else "unknown"
camera = self._extract_camera_raw(data)
bitrate = self._get_tag(data, ["QuickTime:AvgBitrate", "AvgBitrate"])
return VideoMetadata(
width=int(width) if width else 0,
height=int(height) if height else 0,
codec=codec,
fps=float(fps) if fps else 0.0,
camera_model=str(camera) if camera else None,
bitrate_kbps=float(bitrate) / 1000 if bitrate else None
)
|
extract_exif_info(file: VideoFile, dynamic_quality: Dict[str, DynamicQualityRule]) -> Dict[str, Optional[object]]
Extracts camera info and dynamic quality using full ExifTool tags.
Source code in vbc/infrastructure/exif_tool.py
| def extract_exif_info(
self,
file: VideoFile,
dynamic_quality: Dict[str, DynamicQualityRule],
) -> Dict[str, Optional[object]]:
"""Extracts camera info and dynamic quality using full ExifTool tags."""
if not self.et.running:
self.et.run()
with self._lock:
metadata_list = self.et.execute_json(str(file.path))
if not metadata_list:
raise ValueError(f"Could not extract metadata for {file.path}")
tags = metadata_list[0]
# Build a searchable text from tag values only
# This keeps searching in the 'whole exif' but avoids matching dict keys
full_metadata_text = " ".join(str(v) for v in tags.values())
camera_raw = self._extract_camera_raw(tags)
camera_model = None
custom_cq = None
matched_pattern = None
def _rule_cq(rule: Any) -> Optional[int]:
if isinstance(rule, DynamicQualityRule):
return rule.cq
if isinstance(rule, dict):
cq_value = rule.get("cq")
if isinstance(cq_value, int):
return cq_value
return None
# 1. Prioritize matching against the extracted camera model/make
if camera_raw:
for pattern, rule in dynamic_quality.items():
cq_value = _rule_cq(rule)
if pattern in camera_raw:
camera_model = camera_raw
custom_cq = cq_value
matched_pattern = pattern
break
# 2. Fallback: Search in all exif values
if custom_cq is None:
for pattern, rule in dynamic_quality.items():
cq_value = _rule_cq(rule)
if pattern in full_metadata_text:
camera_model = pattern
custom_cq = cq_value
matched_pattern = pattern
break
if not camera_model and camera_raw:
camera_model = camera_raw
# Check for VBC Encoder tag
vbc_encoded = False
# ExifTool often returns keys like "XMP:VBCEncoder" or just "VBCEncoder"
# We check keys in the dict
for key in tags.keys():
k_lower = key.lower()
if "vbcencoder" in k_lower or "vbc encoder" in k_lower:
vbc_encoded = True
break
bitrate = tags.get('QuickTime:AvgBitrate') or tags.get('AvgBitrate')
bitrate_kbps = float(bitrate) / 1000 if bitrate else None
return {
"camera_model": camera_model,
"camera_raw": camera_raw,
"custom_cq": custom_cq,
"bitrate_kbps": bitrate_kbps,
"matched_pattern": matched_pattern,
"vbc_encoded": vbc_encoded,
}
|
copy_metadata(source: Path, target: Path)
Copies EXIF/XMP tags from source to target.
Source code in vbc/infrastructure/exif_tool.py
| def copy_metadata(self, source: Path, target: Path):
"""Copies EXIF/XMP tags from source to target."""
if not self.et.running:
self.et.run()
# Standard command for deep EXIF/XMP copy
cmd = [
"-tagsFromFile", str(source),
"-all:all",
"-unsafe",
"-overwrite_original",
str(target)
]
self.et.execute(*cmd)
|
FFprobe Adapter
Wrapper around ffprobe for stream information.
ffprobe
FFprobeAdapter
Wrapper around ffprobe to extract stream information.
get_stream_info
get_stream_info(file_path: Path) -> Dict[str, Any]
Executes ffprobe and parses JSON output.
Source code in vbc/infrastructure/ffprobe.py
| def get_stream_info(self, file_path: Path) -> Dict[str, Any]:
"""Executes ffprobe and parses JSON output."""
cmd = [
"ffprobe",
"-v", "error",
"-print_format", "json",
"-show_streams",
"-show_format",
str(file_path)
]
timeout_s = self._estimate_timeout(file_path)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout_s)
except subprocess.TimeoutExpired as exc:
raise RuntimeError(f"ffprobe timed out after {timeout_s}s for {file_path}") from exc
if result.returncode != 0:
err = (result.stderr or "").strip()
detail = err if err else "unknown error (no stderr)"
raise RuntimeError(f"ffprobe failed for {file_path}: {detail}")
data = json.loads(result.stdout)
# Find streams
streams = data.get("streams", [])
video_stream = next((s for s in streams if s.get("codec_type") == "video"), None)
if not video_stream:
raise ValueError(f"No video stream found in {file_path}")
audio_stream = next((s for s in streams if s.get("codec_type") == "audio"), None)
if audio_stream is None:
audio_codec = "no-audio"
else:
audio_codec = audio_stream.get("codec_name") or "unknown"
# Parse FPS (prefer avg_frame_rate; r_frame_rate is often timebase)
fps = 0.0
fps_str = video_stream.get("avg_frame_rate", "0/0")
if "/" in fps_str:
try:
num, den = map(float, fps_str.split("/"))
if den != 0:
candidate = num / den
if candidate <= 240:
fps = round(candidate)
except ValueError:
fps = 0.0
else:
try:
candidate = float(fps_str)
if candidate <= 240:
fps = round(candidate)
except ValueError:
fps = 0.0
# Duration fallback order: format.duration, format tags, stream.duration, stream tags, duration_ts/time_base, bitrate/size
fmt = data.get("format", {})
duration = self._to_float(fmt.get("duration"))
if duration <= 0:
tags = fmt.get("tags", {}) or {}
duration = self._parse_duration_tag(tags.get("DURATION") or tags.get("duration"))
if duration <= 0:
duration = self._to_float(video_stream.get("duration"))
if duration <= 0:
tags = video_stream.get("tags", {}) or {}
duration = self._parse_duration_tag(tags.get("DURATION") or tags.get("duration"))
if duration <= 0:
duration = self._parse_time_base_duration(video_stream.get("duration_ts"), video_stream.get("time_base"))
if duration <= 0:
bit_rate = self._to_float(fmt.get("bit_rate") or video_stream.get("bit_rate"))
size = self._to_float(fmt.get("size"))
if bit_rate > 0 and size > 0:
duration = (size * 8) / bit_rate
# Check for VBC tags in format or stream tags
format_tags = fmt.get("tags", {}) or {}
stream_tags = video_stream.get("tags", {}) or {}
# Check for VBC Encoder tag (case-insensitive check for key presence)
vbc_encoded = False
for tags_dict in (format_tags, stream_tags):
for k, v in tags_dict.items():
if k.lower() in ("vbcencoder", "vbc encoder"):
vbc_encoded = True
break
if vbc_encoded:
break
bitrate_bps = self._to_float(fmt.get("bit_rate") or video_stream.get("bit_rate"))
return {
"width": int(video_stream.get("width", 0)),
"height": int(video_stream.get("height", 0)),
"codec": video_stream.get("codec_name", "unknown"),
"audio_codec": audio_codec,
"fps": fps,
"duration": duration,
"bitrate_kbps": (bitrate_bps / 1000.0) if bitrate_bps > 0 else None,
"color_space": video_stream.get("color_space"),
"pix_fmt": video_stream.get("pix_fmt"),
"vbc_encoded": vbc_encoded,
}
|
FFmpeg Adapter
Wrapper around ffmpeg for video compression.
ffmpeg
FFmpeg process wrapper for AV1 video compression.
Handles subprocess lifecycle, progress monitoring, error detection, and recovery.
Detects hardware capability errors and color space issues, publishing events on failures.
FFmpegAdapter
FFmpegAdapter(event_bus: EventBus)
Subprocess adapter for FFmpeg video compression.
Manages FFmpeg execution with real-time progress monitoring via stdout parsing.
Detects GPU hardware capability exhaustion (exit code 187) and color space bugs
in FFmpeg 7.x, triggering automatic recovery via remuxing + retry.
Source code in vbc/infrastructure/ffmpeg.py
| def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.logger = logging.getLogger(__name__)
|
compress
compress(job: CompressionJob, config: AppConfig, use_gpu: bool, quality: Optional[int] = None, rate_control: Optional[ResolvedRateControl] = None, rotate: Optional[int] = None, shutdown_event=None, input_path: Optional[Path] = None)
Execute AV1 compression via FFmpeg subprocess.
Spawns FFmpeg, monitors stdout for progress updates, detects errors including:
- Hardware capability exhaustion (HW_CAP_LIMIT status)
- FFmpeg 7.x color space bugs (triggers _apply_color_fix)
- Exit code failures
Publishes JobProgressUpdated, JobFailed, and HardwareCapabilityExceeded events.
Handles graceful shutdown via shutdown_event (Ctrl+C integration).
Parameters:
| Name |
Type |
Description |
Default |
job
|
CompressionJob
|
Compression job to process.
|
required
|
config
|
AppConfig
|
AppConfig with encoder settings and flags.
|
required
|
use_gpu
|
bool
|
Whether the GPU encoder is active.
|
required
|
quality
|
Optional[int]
|
Optional quality override (CQ/CRF) for this job.
|
None
|
rate_control
|
Optional[ResolvedRateControl]
|
Optional bitrate control override for rate mode.
|
None
|
rotate
|
Optional[int]
|
Optional rotation angle (degrees).
|
None
|
shutdown_event
|
|
Threading.Event to signal interruption.
|
None
|
input_path
|
Optional[Path]
|
Override input path (used for color fix retry).
|
None
|
Side Effects
- Updates job.status, job.error_message, job.duration_seconds
- Writes .tmp file during processing; renames to output on success
- Publishes events to EventBus
- Cleans up .tmp file on error/interruption
Source code in vbc/infrastructure/ffmpeg.py
| def compress(
self,
job: CompressionJob,
config: AppConfig,
use_gpu: bool,
quality: Optional[int] = None,
rate_control: Optional[ResolvedRateControl] = None,
rotate: Optional[int] = None,
shutdown_event=None,
input_path: Optional[Path] = None,
):
"""Execute AV1 compression via FFmpeg subprocess.
Spawns FFmpeg, monitors stdout for progress updates, detects errors including:
- Hardware capability exhaustion (HW_CAP_LIMIT status)
- FFmpeg 7.x color space bugs (triggers _apply_color_fix)
- Exit code failures
Publishes JobProgressUpdated, JobFailed, and HardwareCapabilityExceeded events.
Handles graceful shutdown via shutdown_event (Ctrl+C integration).
Args:
job: Compression job to process.
config: AppConfig with encoder settings and flags.
use_gpu: Whether the GPU encoder is active.
quality: Optional quality override (CQ/CRF) for this job.
rate_control: Optional bitrate control override for rate mode.
rotate: Optional rotation angle (degrees).
shutdown_event: Threading.Event to signal interruption.
input_path: Override input path (used for color fix retry).
Side Effects:
- Updates job.status, job.error_message, job.duration_seconds
- Writes .tmp file during processing; renames to output on success
- Publishes events to EventBus
- Cleans up .tmp file on error/interruption
"""
filename = job.source_file.path.name
start_time = time.monotonic() if config.general.debug else None
encoder_args = select_encoder_args(config, use_gpu)
if config.general.quality_mode == "rate":
if rate_control is None:
raise ValueError("Missing resolved rate control for quality_mode=rate.")
encoder_args = apply_rate_control_args(
encoder_args,
use_gpu=use_gpu,
rate_control=rate_control,
)
elif quality is not None:
encoder_args = replace_quality_value(encoder_args, quality)
if not use_gpu:
encoder_args = apply_cpu_thread_overrides(encoder_args, config.general.ffmpeg_cpu_threads)
if config.cpu_encoder.advanced and config.cpu_encoder.advanced_enforce_input_pix_fmt:
pix_fmt = None
if job.source_file.metadata:
pix_fmt = job.source_file.metadata.pix_fmt
encoder_args = apply_pix_fmt_arg(encoder_args, pix_fmt)
if config.general.debug:
encoder_name = _extract_flag_value(encoder_args, "-c:v") or "unknown"
if config.general.quality_mode == "rate":
quality_text = f"RATE={_extract_flag_value(encoder_args, '-b:v')}"
else:
quality_value = extract_quality_value(encoder_args)
quality_flag = extract_quality_flag(encoder_args)
quality_label = "CQ" if quality_flag == "-cq" else "CRF" if quality_flag == "-crf" else "Q"
quality_text = f"{quality_label}={quality_value}" if quality_value is not None else "quality=unknown"
self.logger.info(
f"FFMPEG_START: {filename} (gpu={use_gpu}, encoder={encoder_name}, {quality_text})"
)
if config.general.debug:
_, audio_mode, audio_codec = self._select_audio_options(job)
self.logger.info(f"AUDIO_MODE: {filename} mode={audio_mode} codec={audio_codec}")
cmd = self._build_command(job, config, encoder_args, use_gpu, rotate, input_path=input_path)
if config.general.debug:
self.logger.debug(f"FFMPEG_CMD: {' '.join(cmd)}")
# Use duration for progress calculation
total_duration = job.source_file.metadata.duration if job.source_file.metadata else 0.0
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
# Regex to parse 'time=00:00:00.00' from ffmpeg output
time_regex = re.compile(r"time=(\d+):(\d+):(\d+\.\d+)")
hw_cap_error = False
gpu_unavailable_error = False
gpu_unavailable_detail: Optional[str] = None
color_error = False
output_queue: "queue.Queue[Optional[str]]" = queue.Queue()
def _reader():
if not process.stdout:
output_queue.put(None)
return
for line in process.stdout:
output_queue.put(line)
output_queue.put(None)
reader_thread = threading.Thread(target=_reader, daemon=True)
reader_thread.start()
try:
while True:
# Check for shutdown signal from orchestrator
if shutdown_event and shutdown_event.is_set():
self.logger.info(f"FFMPEG_INTERRUPTED: {filename} (shutdown signal)")
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
# Clean up tmp file
tmp_path = job.output_path.with_suffix('.tmp')
if tmp_path.exists():
tmp_path.unlink()
# Set INTERRUPTED status and return (don't raise exception)
job.status = JobStatus.INTERRUPTED
job.error_message = "Interrupted by user (Ctrl+C)"
return # Exit compress() early
try:
line = output_queue.get(timeout=0.1)
except queue.Empty:
if process.poll() is not None:
break
continue
if line is None:
break
line_stripped = line.strip()
line_lower = line.lower()
if (
"Hardware is lacking required capabilities" in line
or "No capable devices found" in line
or "not supported" in line and "nvenc" in line.lower()
):
hw_cap_error = True
if use_gpu:
is_gpu_unavailable = (
"cuda_error_no_device" in line_lower
or "no cuda-capable device is detected" in line_lower
or "no nvenc capable devices found" in line_lower
or "cannot load libcuda" in line_lower
or "driver does not support the required nvenc api version" in line_lower
or "openencodesessionex failed" in line_lower
)
if is_gpu_unavailable:
gpu_unavailable_error = True
hw_cap_error = True
if gpu_unavailable_detail is None and line_stripped:
gpu_unavailable_detail = line_stripped
if "is not a valid value for color_primaries" in line or "is not a valid value for color_trc" in line:
color_error = True
match = time_regex.search(line)
if match:
h, m, s = map(float, match.groups())
current_seconds = h * 3600 + m * 60 + s
if total_duration > 0:
progress_percent = min(100.0, (current_seconds / total_duration) * 100.0)
self.event_bus.publish(JobProgressUpdated(job=job, progress_percent=progress_percent))
process.wait()
except KeyboardInterrupt:
# User pressed Ctrl+C directly in this thread (shouldn't happen with daemon threads)
self.logger.info(f"FFMPEG_INTERRUPTED: {filename} (KeyboardInterrupt)")
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
# Clean up tmp file
tmp_path = job.output_path.with_suffix('.tmp')
if tmp_path.exists():
tmp_path.unlink()
# Set status and re-raise to propagate to orchestrator
job.status = JobStatus.INTERRUPTED
job.error_message = "Interrupted by user (Ctrl+C)"
raise
# Get tmp file path
tmp_path = job.output_path.with_suffix('.tmp')
# Check for hardware capability error (code 187 or text match)
if hw_cap_error or process.returncode == 187:
job.status = JobStatus.HW_CAP_LIMIT
if gpu_unavailable_error:
detail = gpu_unavailable_detail or "GPU encoder initialization failed"
job.error_message = (
f"GPU AV1 encode unavailable: {detail}. "
"Use --cpu or enable cpu_fallback."
)
else:
job.error_message = "Hardware is lacking required capabilities"
# Cleanup tmp file on error
if tmp_path.exists():
tmp_path.unlink()
self.event_bus.publish(HardwareCapabilityExceeded(job=job))
if config.general.debug and start_time:
elapsed = time.monotonic() - start_time
self.logger.info(f"FFMPEG_END: {filename} status=hw_cap_limit elapsed={elapsed:.2f}s")
elif color_error:
# Re-run with color fix remux (recursive call sets final status)
if config.general.debug:
self.logger.info(f"FFMPEG_COLORFIX: {filename} (applying color space fix)")
self._apply_color_fix(
job,
config,
use_gpu,
quality,
rate_control,
rotate,
shutdown_event=shutdown_event,
)
# Status is now set by recursive compress() call, don't override
if config.general.debug and start_time:
elapsed = time.monotonic() - start_time
self.logger.info(f"FFMPEG_END: {filename} status={job.status.value} elapsed={elapsed:.2f}s (with colorfix)")
elif process.returncode != 0:
job.status = JobStatus.FAILED
job.error_message = f"ffmpeg exited with code {process.returncode}"
# Cleanup tmp file on error
if tmp_path.exists():
tmp_path.unlink()
self.event_bus.publish(JobFailed(job=job, error_message=job.error_message))
if config.general.debug and start_time:
elapsed = time.monotonic() - start_time
self.logger.info(f"FFMPEG_END: {filename} status=failed code={process.returncode} elapsed={elapsed:.2f}s")
else:
# Success only when tmp exists and can be atomically renamed to final output.
if not tmp_path.exists():
job.status = JobStatus.FAILED
job.error_message = "ffmpeg succeeded but temporary output file is missing"
self.event_bus.publish(JobFailed(job=job, error_message=job.error_message))
if config.general.debug and start_time:
elapsed = time.monotonic() - start_time
self.logger.info(
f"FFMPEG_END: {filename} status=failed reason=missing_tmp elapsed={elapsed:.2f}s"
)
return
try:
tmp_path.rename(job.output_path)
except OSError as exc:
job.status = JobStatus.FAILED
job.error_message = f"ffmpeg succeeded but failed to finalize output: {exc}"
self.event_bus.publish(JobFailed(job=job, error_message=job.error_message))
# Best effort cleanup of orphan tmp.
if tmp_path.exists():
tmp_path.unlink()
if config.general.debug and start_time:
elapsed = time.monotonic() - start_time
self.logger.info(
f"FFMPEG_END: {filename} status=failed reason=rename_error elapsed={elapsed:.2f}s"
)
return
job.status = JobStatus.COMPLETED
if config.general.debug and start_time:
elapsed = time.monotonic() - start_time
self.logger.info(f"FFMPEG_END: {filename} status=completed elapsed={elapsed:.2f}s")
|
Housekeeping Service
Cleanup service for temporary files and error markers.
housekeeping
HousekeepingService
Service for cleaning up temporary files and error markers.
cleanup_output_markers
cleanup_output_markers(input_dir: Path, output_dir: Path, errors_dir: Path, clean_errors: bool, logger: Optional[Logger] = None) -> None
Cleanup .tmp (always) and .err (only if clean_errors) in output_dir.
If a marker has a corresponding source file in input_dir, delete it.
Otherwise, move it to errors_dir and log a warning.
Source code in vbc/infrastructure/housekeeping.py
| def cleanup_output_markers(
self,
input_dir: Path,
output_dir: Path,
errors_dir: Path,
clean_errors: bool,
logger: Optional[logging.Logger] = None,
) -> None:
"""Cleanup .tmp (always) and .err (only if clean_errors) in output_dir.
If a marker has a corresponding source file in input_dir, delete it.
Otherwise, move it to errors_dir and log a warning.
"""
if not output_dir.exists():
return
markers = []
for root, _dirs, files in os.walk(output_dir):
for file in files:
if file.endswith(".tmp") or (clean_errors and file.endswith(".err")):
markers.append(Path(root) / file)
if not markers:
return
errors_dir.mkdir(parents=True, exist_ok=True)
for marker in markers:
try:
rel_marker = marker.relative_to(output_dir)
except ValueError:
rel_marker = Path(marker.name)
source = self._find_source_for_marker(input_dir, rel_marker)
if source and source.exists():
try:
marker.unlink()
except OSError:
pass
continue
dest = errors_dir / rel_marker
dest.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.move(str(marker), str(dest))
except OSError:
continue
if logger:
logger.warning(
f"Moved stale marker without source file: {marker} -> {dest}"
)
|
Web Server
Read-only HTMX dashboard server and template rendering helpers.
web_server
Read-only HTMX web dashboard for VBC.
Serves a single-page dashboard that auto-refreshes via HTMX polling every 2s.
Runs as a daemon thread — stops automatically when VBC exits.
No new dependencies: uses stdlib http.server + socketserver only.
HTMX 2.0.8 and Pico.css 2.1.1 loaded from jsDelivr CDN.
Static files (style.css, theme-switcher.js) served from vbc/infrastructure/web/.
HTML fragments rendered via Jinja2 templates in vbc/infrastructure/web/templates/.
VBCRequestHandler
Bases: BaseHTTPRequestHandler
HTTP request handler for VBC web dashboard.
Class attribute state is set by VBCWebServer before the server starts.
log_message
log_message(format: str, *args) -> None
Suppress default access log to keep VBC terminal clean.
Source code in vbc/infrastructure/web_server.py
| def log_message(self, format: str, *args) -> None: # noqa: A002
"""Suppress default access log to keep VBC terminal clean."""
|
VBCWebServer
VBCWebServer(state: 'UIState', port: int = DEFAULT_PORT, host: str = '0.0.0.0')
Read-only HTMX web dashboard server for VBC.
Runs as a daemon thread — stops automatically when VBC process exits.
Usage::
server = VBCWebServer(state=ui_state, port=8765)
server.start() # non-blocking, prints URL
# ... VBC runs ...
server.stop() # optional; daemon thread auto-stops on exit
Source code in vbc/infrastructure/web_server.py
| def __init__(self, state: "UIState", port: int = DEFAULT_PORT, host: str = "0.0.0.0") -> None:
self.state = state
self.port = port
self.host = host
self._server: Optional[_ThreadingHTTPServer] = None
self._thread: Optional[threading.Thread] = None
|
start
Start web server in a daemon background thread.
Source code in vbc/infrastructure/web_server.py
| def start(self) -> None:
"""Start web server in a daemon background thread."""
VBCRequestHandler.state = self.state # inject shared state
try:
self._server = _ThreadingHTTPServer((self.host, self.port), VBCRequestHandler)
except OSError as exc:
logger.warning("Web dashboard: could not bind to %s:%d: %s", self.host, self.port, exc)
print(f"[VBC] Web dashboard: {self.host}:{self.port} unavailable — dashboard disabled.")
return
self._thread = threading.Thread(
target=self._server.serve_forever,
name="vbc-web-dashboard",
daemon=True,
)
self._thread.start()
display_host = "localhost" if self.host in ("0.0.0.0", "::") else self.host
logger.info("Web dashboard: http://%s:%d/", display_host, self.port)
print(f"[VBC] Web dashboard: http://{display_host}:{self.port}/")
|
stop
Gracefully stop the web server.
Source code in vbc/infrastructure/web_server.py
| def stop(self) -> None:
"""Gracefully stop the web server."""
if self._server:
self._server.shutdown()
self._server = None
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
|
GPU Monitor
Background GPU metrics sampler for dashboard sparklines.
gpu_monitor
GpuMonitor
GpuMonitor(state: UIState, refresh_rate: int = 5, device_index: int = 0, device_name: Optional[str] = None, nvtop_path: Optional[str] = None)
Monitors GPU metrics using nvtop -s in a background thread.
Source code in vbc/infrastructure/gpu_monitor.py
| def __init__(self, state: UIState, refresh_rate: int = 5,
device_index: int = 0, device_name: Optional[str] = None,
nvtop_path: Optional[str] = None):
self.state = state
self.refresh_rate = refresh_rate
self.device_index = device_index
self.device_name = device_name
self.logger = logging.getLogger(__name__)
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
# Determine nvtop binary path
if nvtop_path:
self.nvtop_cmd = nvtop_path
self._nvtop_available = os.path.isfile(nvtop_path) and os.access(nvtop_path, os.X_OK)
else:
nvtop_which = shutil.which("nvtop")
self.nvtop_cmd = nvtop_which or "nvtop"
self._nvtop_available = nvtop_which is not None
|
start
Starts the monitoring thread.
Source code in vbc/infrastructure/gpu_monitor.py
| def start(self):
"""Starts the monitoring thread."""
if self._thread is not None:
return
if not self._nvtop_available:
self.logger.info("GPU Monitor started (nvtop not available, GPU metrics disabled)")
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._poll, daemon=True)
self._thread.start()
self.logger.info("GPU Monitor started")
|
stop
Stops the monitoring thread.
Source code in vbc/infrastructure/gpu_monitor.py
| def stop(self):
"""Stops the monitoring thread."""
self._stop_event.set()
if self._thread:
self._thread.join(timeout=1.0)
self._thread = None
self.logger.info("GPU Monitor stopped")
|
parse_number
parse_number(s: str) -> Optional[float]
Parse number from string like '52C', '30%', '112W'.
Source code in vbc/infrastructure/gpu_monitor.py
| def parse_number(s: str) -> Optional[float]:
"""Parse number from string like '52C', '30%', '112W'."""
if not s:
return None
s = str(s).strip()
if s in {"N/A", "--", "??"}:
return None
m = NUM_RE.search(s)
return float(m.group(1)) if m else None
|
parse_temp
parse_temp(s: str) -> Optional[float]
'52C' → 52.0, '??' → None
Source code in vbc/infrastructure/gpu_monitor.py
| def parse_temp(s: str) -> Optional[float]:
"""'52C' → 52.0, '??' → None"""
return parse_number(s)
|
parse_percent
parse_percent(s: str) -> Optional[float]
'30%' → 30.0
Source code in vbc/infrastructure/gpu_monitor.py
| def parse_percent(s: str) -> Optional[float]:
"""'30%' → 30.0"""
return parse_number(s)
|
parse_watts
parse_watts(s: str) -> Optional[float]
'112W' → 112.0
Source code in vbc/infrastructure/gpu_monitor.py
| def parse_watts(s: str) -> Optional[float]:
"""'112W' → 112.0"""
return parse_number(s)
|
parse_fan_speed
parse_fan_speed(s: str) -> Optional[float]
'35%' → 35.0, 'CPU Fan' → None, 'N/A' → None
Source code in vbc/infrastructure/gpu_monitor.py
| def parse_fan_speed(s: str) -> Optional[float]:
"""'35%' → 35.0, 'CPU Fan' → None, 'N/A' → None"""
result = parse_number(s)
# If we got a valid number, return it
if result is not None:
return result
# If it's a non-empty string without a number, still return None
# (the UI will handle displaying "-" for None values)
return None
|
Cleanup helpers for stale ExifTool temporary files.
exiftool_tmp_path(target_path: Path) -> Path
Return ExifTool's temporary write path for a target file.
Source code in vbc/infrastructure/exiftool_tmp.py
| def exiftool_tmp_path(target_path: Path) -> Path:
"""Return ExifTool's temporary write path for a target file."""
return target_path.with_name(f"{target_path.name}{EXIFTOOL_TMP_SUFFIX}")
|
remove_exiftool_tmp_for_target(target_path: Path, logger: Optional[Logger] = None) -> Optional[Path]
Remove a stale ExifTool temp file for a single target if present.
Source code in vbc/infrastructure/exiftool_tmp.py
| def remove_exiftool_tmp_for_target(
target_path: Path,
logger: Optional[logging.Logger] = None,
) -> Optional[Path]:
"""Remove a stale ExifTool temp file for a single target if present."""
tmp_path = exiftool_tmp_path(target_path)
if not os.path.lexists(tmp_path):
return None
if tmp_path.is_dir() and not tmp_path.is_symlink():
if logger:
logger.warning(f"Refusing to remove ExifTool temp directory: {tmp_path}")
return None
tmp_path.unlink()
if logger:
logger.warning(f"Removed stale ExifTool temp file: {tmp_path}")
return tmp_path
|
cleanup_exiftool_tmp_files(roots: Iterable[Path], logger: Optional[Logger] = None) -> List[Path]
Remove stale ExifTool temp files below output roots.
Source code in vbc/infrastructure/exiftool_tmp.py
| def cleanup_exiftool_tmp_files(
roots: Iterable[Path],
logger: Optional[logging.Logger] = None,
) -> List[Path]:
"""Remove stale ExifTool temp files below output roots."""
removed: List[Path] = []
seen_roots = set()
for root in roots:
root = Path(root)
if root in seen_roots:
continue
seen_roots.add(root)
if not root.exists():
continue
for tmp_path in root.rglob(f"*{EXIFTOOL_TMP_SUFFIX}"):
if tmp_path.is_dir() and not tmp_path.is_symlink():
if logger:
logger.warning(f"Refusing to remove ExifTool temp directory: {tmp_path}")
continue
if not tmp_path.is_file() and not tmp_path.is_symlink():
continue
tmp_path.unlink()
removed.append(tmp_path)
if logger:
logger.warning(f"Removed stale ExifTool temp file: {tmp_path}")
return removed
|
Logging
Logging configuration and setup.
logging
setup_logging
setup_logging(output_dir: Path, debug: bool = False, log_path: Optional[Path] = None) -> logging.Logger
Setup logging configuration for VBC.
Creates output directory and compression.log file.
Returns configured logger instance.
Parameters:
| Name |
Type |
Description |
Default |
output_dir
|
Path
|
Directory where output files are written
|
required
|
debug
|
bool
|
If True, enable DEBUG level logging with detailed timings
|
False
|
log_path
|
Optional[Path]
|
Optional path to log file (overrides output_dir)
|
None
|
Source code in vbc/infrastructure/logging.py
| def setup_logging(output_dir: Path, debug: bool = False, log_path: Optional[Path] = None) -> logging.Logger:
"""
Setup logging configuration for VBC.
Creates output directory and compression.log file.
Returns configured logger instance.
Args:
output_dir: Directory where output files are written
debug: If True, enable DEBUG level logging with detailed timings
log_path: Optional path to log file (overrides output_dir)
"""
# Create output directory
output_dir.mkdir(exist_ok=True)
# Setup log file
log_file = Path(log_path) if log_path else (output_dir / "compression.log")
log_file.parent.mkdir(parents=True, exist_ok=True)
# Configure logging level
level = logging.DEBUG if debug else logging.INFO
# Configure logging
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(log_file)],
force=True # Override any existing configuration
)
logger = logging.getLogger(__name__)
logger.info(f"Logging initialized: {log_file} (debug={'ON' if debug else 'OFF'})")
return logger
|
Usage Examples
EventBus
from vbc.infrastructure.event_bus import EventBus
from vbc.domain.events import Event
from pydantic import BaseModel
# Create custom event
class CustomEvent(Event):
message: str
# Create bus
bus = EventBus()
# Subscribe
def handler(event: CustomEvent):
print(f"Received: {event.message}")
bus.subscribe(CustomEvent, handler)
# Publish
bus.publish(CustomEvent(message="Hello!"))
# Output: Received: Hello!
FileScanner
from pathlib import Path
from vbc.infrastructure.file_scanner import FileScanner
# Create scanner
scanner = FileScanner(
extensions=[".mp4", ".mov", ".avi"],
min_size_bytes=1024 * 1024 # 1 MiB
)
# Scan directory
for video_file in scanner.scan(Path("/videos")):
print(f"{video_file.path.name}: {video_file.size_bytes} bytes")
from vbc.infrastructure.exif_tool import ExifToolAdapter
from vbc.domain.models import VideoFile
from pathlib import Path
# Create adapter
exif = ExifToolAdapter()
exif.et.run() # Start ExifTool process
# Extract metadata
video = VideoFile(path=Path("video.mp4"), size_bytes=1000000)
metadata = exif.extract_metadata(video)
print(f"Camera: {metadata.camera_model}")
print(f"Resolution: {metadata.width}x{metadata.height}")
print(f"Codec: {metadata.codec}")
# Extract EXIF info with dynamic quality
dynamic_quality = {
"ILCE-7RM5": {"cq": 38, "rate": {"bps": "0.8", "minrate": "0.7", "maxrate": "0.9"}},
"DC-GH7": {"cq": 40},
}
exif_info = exif.extract_exif_info(video, dynamic_quality)
if exif_info["custom_cq"]:
print(f"Using custom CQ: {exif_info['custom_cq']}")
# Copy metadata
exif.copy_metadata(
source=Path("source.mp4"),
target=Path("output.mp4")
)
# Cleanup
exif.et.terminate()
FFprobeAdapter
from vbc.infrastructure.ffprobe import FFprobeAdapter
from pathlib import Path
# Create adapter
ffprobe = FFprobeAdapter()
# Get stream info
info = ffprobe.get_stream_info(Path("video.mp4"))
print(f"Codec: {info['codec']}")
print(f"Resolution: {info['width']}x{info['height']}")
print(f"FPS: {info['fps']}")
print(f"Color space: {info['color_space']}")
print(f"Duration: {info['duration']} seconds")
FFmpegAdapter
from vbc.infrastructure.ffmpeg import FFmpegAdapter
from vbc.infrastructure.event_bus import EventBus
from vbc.domain.models import CompressionJob, VideoFile, JobStatus
from vbc.config.models import AppConfig, GeneralConfig
from pathlib import Path
import threading
# Create adapter
bus = EventBus()
ffmpeg = FFmpegAdapter(event_bus=bus)
# Create job
job = CompressionJob(
source_file=VideoFile(
path=Path("input.mp4"),
size_bytes=100000000
),
output_path=Path("output.mp4"),
rotation_angle=180
)
# Create config
config = AppConfig(
general=GeneralConfig(threads=4, gpu=True, copy_metadata=True)
)
# Compress
shutdown_event = threading.Event()
ffmpeg.compress(
job=job,
config=config,
use_gpu=config.general.gpu,
rotate=180,
shutdown_event=shutdown_event
)
# Check status
if job.status == JobStatus.COMPLETED:
print(f"Success! Output: {job.output_path}")
elif job.status == JobStatus.HW_CAP_LIMIT:
print("Hardware capability exceeded")
elif job.status == JobStatus.FAILED:
print(f"Failed: {job.error_message}")
HousekeepingService
from vbc.infrastructure.housekeeping import HousekeepingService
from pathlib import Path
# Create service
housekeeper = HousekeepingService()
# Cleanup markers in output dir
housekeeper.cleanup_output_markers(
input_dir=Path("/videos"),
output_dir=Path("/videos_out"),
errors_dir=Path("/videos_err"),
clean_errors=True, # True: cleanup .tmp and .err, False: only .tmp
)
# Removes or relocates markers:
# - *.tmp always
# - *.err only when clean_errors=True
Adapter Patterns
Dependency Injection
All adapters are injected into the Orchestrator:
from pathlib import Path
from vbc.pipeline.orchestrator import Orchestrator
from vbc.infrastructure.file_scanner import FileScanner
from vbc.infrastructure.exif_tool import ExifToolAdapter
from vbc.infrastructure.ffprobe import FFprobeAdapter
from vbc.infrastructure.ffmpeg import FFmpegAdapter
from vbc.infrastructure.event_bus import EventBus
from vbc.config.loader import load_config
# Create dependencies
config = load_config(Path("conf/vbc.yaml"))
bus = EventBus()
scanner = FileScanner(
extensions=config.general.extensions,
min_size_bytes=config.general.min_size_bytes
)
exif = ExifToolAdapter()
ffprobe = FFprobeAdapter()
ffmpeg = FFmpegAdapter(event_bus=bus)
# Inject into orchestrator
orchestrator = Orchestrator(
config=config,
event_bus=bus,
file_scanner=scanner,
exif_adapter=exif,
ffprobe_adapter=ffprobe,
ffmpeg_adapter=ffmpeg
)
# Run
orchestrator.run(Path("/videos"))
Thread Safety
Adapters that access shared state use locks:
# ExifToolAdapter uses threading.Lock
class ExifToolAdapter:
def __init__(self):
self.et = exiftool.ExifTool()
self._lock = threading.Lock()
def extract_metadata(self, file: VideoFile):
with self._lock:
metadata = self.et.execute_json(str(file.path))
# ... process metadata
Error Handling
Adapters raise exceptions for invalid inputs:
from vbc.infrastructure.ffprobe import FFprobeAdapter
ffprobe = FFprobeAdapter()
try:
info = ffprobe.get_stream_info(Path("nonexistent.mp4"))
except Exception as e:
print(f"ffprobe failed: {e}")
# Handle corrupted/missing file