Pipeline API¶
This page documents the Orchestrator, the core processing coordinator.
Orchestrator¶
The Orchestrator is the heart of VBC, coordinating all compression jobs.
orchestrator ¶
Pipeline orchestrator for video compression job lifecycle management.
Coordinates file discovery, queue management, metadata extraction, compression jobs, and dynamic thread control. Uses event-driven architecture with EventBus for loose coupling between pipeline and UI layers.
Key responsibilities: - Discover video files matching extensions and size filters - Extract and cache video metadata (codec, FPS, camera model, etc.) - Manage queue of pending jobs with configurable sort order - Submit jobs to thread pool respecting prefetch_factor (submit-on-demand pattern) - Handle job lifecycle: discovery → queuing → processing → completion/failure - Support dynamic thread count adjustment and graceful shutdown - Emit events for UI updates (JobStarted, JobCompleted, JobFailed, etc.)
Orchestrator ¶
Orchestrator(config: AppConfig, event_bus: EventBus, file_scanner: FileScanner, exif_adapter: ExifToolAdapter, ffprobe_adapter: FFprobeAdapter, ffmpeg_adapter: FFmpegAdapter, output_dir_map: Optional[Dict[Path, Path]] = None, local_config_registry: Optional[LocalConfigRegistry] = None, cli_overrides: Optional[CliConfigOverrides] = None)
Video compression pipeline orchestrator.
Manages the full job lifecycle: discovery → queuing → compression. Coordinates with infrastructure adapters (FFmpeg, ExifTool, FFprobe) and publishes events to the UI via EventBus.
Uses thread-safe state management with Condition variables for: - Dynamic thread pool sizing (Ctrl+< and Ctrl+>) - Graceful shutdown coordination (Ctrl+S) - Queue refresh signaling (Ctrl+R)
Implements "submit-on-demand" pattern: submits only prefetch_factor*threads jobs to thread pool; submits new jobs as workers complete (avoids queueing thousands of futures for large directories).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
AppConfig
|
AppConfig with general, GPU, UI, autorotate, and input/output settings. |
required |
event_bus
|
EventBus
|
EventBus for publishing job lifecycle events. |
required |
file_scanner
|
FileScanner
|
FileScanner for discovering video files. |
required |
exif_adapter
|
ExifToolAdapter
|
ExifToolAdapter for metadata extraction (camera, GPS, etc.). |
required |
ffprobe_adapter
|
FFprobeAdapter
|
FFprobeAdapter for codec, FPS, duration probing. |
required |
ffmpeg_adapter
|
FFmpegAdapter
|
FFmpegAdapter for AV1 compression execution. |
required |
output_dir_map
|
Optional[Dict[Path, Path]]
|
Optional override mapping input_dir → output_dir (else uses suffix). |
None
|
Source code in vbc/pipeline/orchestrator.py
Demo Orchestrator¶
Simulation-mode pipeline used by --demo.
demo_orchestrator ¶
DemoOrchestrator ¶
Source code in vbc/pipeline/demo_orchestrator.py
Error File Mover¶
Helpers for moving files that failed verification or compression.
error_file_mover ¶
move_failed_files ¶
move_failed_files(input_dirs: Iterable[Path], output_dir_map: Dict[Path, Path], errors_dir_map: Dict[Path, Path], extensions: List[str], logger: Optional[Logger] = None, error_entries: Optional[List[Tuple[Path, Path, Path, Path]]] = None) -> List[Path]
Moves failed source files and their error markers to the errors directory. Returns a list of paths to the moved video files in the destination (errors) directory.
Source code in vbc/pipeline/error_file_mover.py
Queue Sorting¶
Queue ordering strategies used before submit-on-demand scheduling.
queue_sorting ¶
Repair¶
Repair loop helpers for corrupted FLV inputs and failed outputs.
repair ¶
process_repairs ¶
process_repairs(input_dirs: List[Path], errors_dir_map: Dict[Path, Path], extensions: List[str], logger: Optional[Logger] = None, target_files: Optional[List[Path]] = None) -> int
Scans error directories for corrupted files and attempts to repair them. Strategy 1: Text prefix removal (for FLV/MP4 stream dumps) as a pre-clean step. Strategy 2: Fast re-encode to MKV (final output for all repairs).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_dirs
|
List[Path]
|
List of source input directories. |
required |
errors_dir_map
|
Dict[Path, Path]
|
Mapping from input_dir to errors_dir. |
required |
extensions
|
List[str]
|
List of video extensions to scan for. |
required |
logger
|
Optional[Logger]
|
Logger instance. |
None
|
target_files
|
Optional[List[Path]]
|
Optional list of specific files to repair. |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of successfully repaired files. |
Source code in vbc/pipeline/repair.py
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | |
Key Responsibilities¶
- Discovery: Scan input directory and filter files
- Metadata Management: Thread-safe caching of video metadata
- Decision Logic: Determine CQ and rotation per file
- Queue Management: Submit-on-demand pattern with prefetch
- Concurrency Control: Dynamic thread adjustment
- Event Emission: Publish events for UI updates
- Error Handling: Create .err markers, handle corrupted files
- Graceful Shutdown: Finish active jobs on user request
Usage Example¶
from pathlib import Path
from vbc.config.loader import load_config
from vbc.infrastructure.event_bus import EventBus
from vbc.infrastructure.file_scanner import FileScanner
from vbc.infrastructure.exif_tool import ExifToolAdapter
from vbc.infrastructure.ffprobe import FFprobeAdapter
from vbc.infrastructure.ffmpeg import FFmpegAdapter
from vbc.pipeline.orchestrator import Orchestrator
# Load configuration
config = load_config(Path("conf/vbc.yaml"))
# Create dependencies
bus = EventBus()
scanner = FileScanner(
extensions=config.general.extensions,
min_size_bytes=config.general.min_size_bytes
)
exif = ExifToolAdapter()
exif.et.run() # Start ExifTool process
ffprobe = FFprobeAdapter()
ffmpeg = FFmpegAdapter(event_bus=bus)
# Create orchestrator
orchestrator = Orchestrator(
config=config,
event_bus=bus,
file_scanner=scanner,
exif_adapter=exif,
ffprobe_adapter=ffprobe,
ffmpeg_adapter=ffmpeg
)
# Run compression
try:
orchestrator.run(Path("/videos"))
finally:
# Cleanup ExifTool
if exif.et.running:
exif.et.terminate()
Processing Pipeline¶
Illustrative snippets
The snippets in this section show the pipeline shape and event flow. They are intentionally simplified and are not a verbatim copy of vbc.pipeline.orchestrator.
1. Discovery Phase¶
# Orchestrator scans directory
files_to_process, stats = orchestrator._perform_discovery(input_dir)
# Emits event
bus.publish(DiscoveryFinished(
files_found=stats['files_found'],
files_to_process=stats['files_to_process'],
already_compressed=stats['already_compressed'],
ignored_small=stats['ignored_small'],
ignored_err=stats['ignored_err']
))
2. Job Processing¶
# For each file
def _process_file(video_file: VideoFile, input_dir: Path):
# 1. Check for existing .err marker
if err_path.exists() and not config.general.clean_errors:
return # Skip
# 2. Get stream info (ffprobe)
stream_info = ffprobe_adapter.get_stream_info(video_file.path)
# 3. Check and fix color space if needed
input_path, temp_fixed = _check_and_fix_color_space(
video_file.path,
output_path,
stream_info
)
# 4. Get metadata (ExifTool, cached)
video_file.metadata = _get_metadata(video_file, stream_info)
# 5. Filter checks
if config.general.skip_av1 and metadata.codec == "av1":
return # Skip
if config.general.filter_cameras:
if camera_model not in filter_cameras:
return # Skip
# 6. Determine CQ and rotation
target_cq = _determine_cq(video_file)
rotation = _determine_rotation(video_file)
# 7. Create job and compress
job = CompressionJob(
source_file=video_file,
output_path=output_path,
rotation_angle=rotation
)
bus.publish(JobStarted(job=job))
# 8. Compress
ffmpeg_adapter.compress(job, config, rotate=rotation)
# 9. Post-processing
if job.status == JobStatus.COMPLETED:
# Copy metadata
_copy_deep_metadata(source, output)
# Check min ratio
ratio = output_size / input_size
if ratio > (1.0 - config.general.min_compression_ratio):
shutil.copy2(source, output) # Keep original
bus.publish(JobCompleted(job=job))
else:
# Write .err file
err_path.write_text(job.error_message)
bus.publish(JobFailed(job=job))
Concurrency Control¶
ThreadController Pattern¶
# Block until thread slot available
with self._thread_lock:
while self._active_threads >= self._current_max_threads:
self._thread_lock.wait()
if self._shutdown_requested:
return # Don't start new jobs
self._active_threads += 1
# Process job...
# Release slot
with self._thread_lock:
self._active_threads -= 1
self._thread_lock.notify_all() # Wake up waiting threads
Dynamic Adjustment¶
# User presses '>' key
def _on_thread_control(self, event: ThreadControlEvent):
with self._thread_lock:
old = self._current_max_threads
new = old + event.change
self._current_max_threads = max(1, min(8, new))
self._thread_lock.notify_all() # Wake up waiting threads
bus.publish(ActionMessage(message=f"Threads: {old} → {new}"))
Submit-on-Demand Pattern¶
from collections import deque
pending = deque(files_to_process)
in_flight = {} # future -> VideoFile
def submit_batch():
"""Submit files up to max_inflight limit"""
max_inflight = config.general.prefetch_factor * current_max_threads
while len(in_flight) < max_inflight and pending:
vf = pending.popleft()
future = executor.submit(_process_file, vf, input_dir)
in_flight[future] = vf
# Update UI with pending files
bus.publish(QueueUpdated(pending_files=list(pending)))
# Submit initial batch
submit_batch()
# Process as they complete
while in_flight:
done, _ = wait(in_flight, timeout=1.0, return_when=FIRST_COMPLETED)
for future in done:
future.result()
del in_flight[future]
# Replenish queue
submit_batch()
Metadata Caching¶
# Thread-safe cache to avoid redundant ExifTool calls
_metadata_cache: Dict[Path, VideoMetadata] = {}
_metadata_lock = threading.Lock()
def _get_metadata(video_file: VideoFile) -> VideoMetadata:
with self._metadata_lock:
cached = self._metadata_cache.get(video_file.path)
if cached:
return cached
# Extract metadata
stream_info = ffprobe_adapter.get_stream_info(video_file.path)
metadata = _build_metadata(video_file, stream_info)
# Cache it
with self._metadata_lock:
self._metadata_cache[video_file.path] = metadata
return metadata
Decision Logic¶
Dynamic Quality¶
def _determine_cq(self, file: VideoFile) -> int:
use_gpu = self.config.general.gpu
encoder_args = select_encoder_args(self.config, use_gpu)
default_cq = extract_quality_value(encoder_args) or (45 if use_gpu else 32)
if not file.metadata:
return default_cq
# Check for custom CQ from ExifTool
if file.metadata.custom_cq is not None:
return file.metadata.custom_cq
# Check dynamic_quality mapping
if file.metadata.camera_model:
for pattern, rule in self.config.general.dynamic_quality.items():
if pattern in file.metadata.camera_model:
return rule.cq
return default_cq
Auto-Rotation¶
def _determine_rotation(self, file: VideoFile) -> Optional[int]:
# Manual rotation overrides all
if self.config.general.manual_rotation is not None:
return self.config.general.manual_rotation
# Check filename patterns
filename = file.path.name
for pattern, angle in self.config.autorotate.patterns.items():
if re.search(pattern, filename):
return angle
return None
Graceful Shutdown¶
# User presses 'S' key
def _on_shutdown_request(self, event: RequestShutdown):
with self._thread_lock:
self._shutdown_requested = True
self._thread_lock.notify_all() # Wake up all waiting threads
bus.publish(ActionMessage(message="SHUTDOWN requested"))
# In main loop
while in_flight:
done, _ = wait(in_flight, timeout=1.0)
# ...
# Exit if shutdown and no more in flight
if self._shutdown_requested and not in_flight:
logger.info("Shutdown complete")
break
Refresh Queue¶
# User presses 'R' key
def _on_refresh_request(self, event: RefreshRequested):
with self._refresh_lock:
self._refresh_requested = True
# In main loop
if self._refresh_requested:
self._refresh_requested = False
# Re-scan directory
new_files, new_stats = _perform_discovery(input_dir)
# Add only new files (not already submitted)
submitted_paths = {vf.path for vf in in_flight.values()}
submitted_paths.update(vf.path for vf in pending)
added = 0
for vf in new_files:
if vf.path not in submitted_paths:
pending.append(vf)
added += 1
# Update stats
bus.publish(DiscoveryFinished(...))
bus.publish(ActionMessage(message=f"Refreshed: +{added} new files"))
Error Handling¶
Corrupted Files¶
try:
stream_info = ffprobe_adapter.get_stream_info(video_file.path)
except Exception as e:
# ffprobe failed - file is corrupted
err_path.write_text("File is corrupted (ffprobe failed)")
logger.error(f"Corrupted file: {filename}")
job.status = JobStatus.FAILED
bus.publish(JobFailed(job=job, error_message="Corrupted file"))
return
Hardware Capability¶
# FFmpegAdapter detects error and sets status
if job.status == JobStatus.HW_CAP_LIMIT:
# Write .err marker
err_path.write_text("Hardware is lacking required capabilities")
# Event already published by FFmpegAdapter