Event System¶
VBC uses an event-driven architecture with a synchronous Pub/Sub event bus for decoupled communication.
Design¶
EventBus¶
Simple synchronous event dispatcher:
class EventBus:
def __init__(self):
self._subscribers: Dict[Type[Event], List[Callable]] = {}
def subscribe(self, event_type: Type[Event], handler: Callable):
"""Subscribe to an event type."""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
def publish(self, event: Event):
"""Publish an event to all subscribers."""
event_type = type(event)
if event_type in self._subscribers:
for handler in self._subscribers[event_type]:
handler(event)
Characteristics: - Synchronous: Handlers execute in publish thread - Ordered: Subscribers called in registration order - Type-safe: Pydantic events ensure data validity
Event Types¶
Discovery Events¶
DiscoveryStarted¶
Publisher: Orchestrator Subscribers: UIManager Purpose: Signal start of file scanning
DiscoveryFinished¶
class DiscoveryFinished(Event):
files_found: int
files_to_process: int = 0
already_compressed: int = 0
ignored_small: int = 0
ignored_err: int = 0
ignored_av1: int = 0
ignored_err_entries: List[DiscoveryErrorEntry] = Field(default_factory=list)
source_folders_count: int = 1
Publisher: Orchestrator
Subscribers: UIManager
Purpose: Report discovery results, update UI counters, show ignored .err entries, and track multi-source discovery
Job Lifecycle Events¶
JobStarted¶
Publisher: Orchestrator Subscribers: UIManager Purpose: Add job to "Currently Processing" panel
JobCompleted¶
Publisher: Orchestrator Subscribers: UIManager Purpose: Update stats, move to "Last Completed" panel
JobFailed¶
Publisher: Orchestrator, FFmpegAdapter Subscribers: UIManager Purpose: Increment error counters, create .err marker
HardwareCapabilityExceeded¶
Publisher: FFmpegAdapter Subscribers: UIManager Purpose: Track GPU capability errors separately
JobProgressUpdated¶
Publisher: FFmpegAdapter Subscribers: UIManager Purpose: Update per-job progress in the active jobs panel
Queue Events¶
QueueUpdated¶
Publisher: Orchestrator Subscribers: UIManager Purpose: Update "Next in Queue" panel
RefreshRequested¶
Publisher: KeyboardListener Subscribers: Orchestrator Purpose: Re-scan directory for new files
RefreshFinished¶
Publisher: Orchestrator Subscribers: UIManager Purpose: Report results after refresh completes (used for UI counters)
InputDirsChanged¶
Publisher: UIManager (Dirs apply) Subscribers: Orchestrator Purpose: Update active input directory set for next scan cycle
Control Events¶
Control Events Location
The control events below are defined in vbc/domain/events.py so pipeline and UI can depend on the same domain-level contracts. They are published by KeyboardListener in vbc/ui/keyboard.py.
ThreadControlEvent¶
Publisher: KeyboardListener
Subscribers: Orchestrator, UIManager
Purpose: Adjust max concurrent threads
Location: vbc/domain/events.py (published by vbc/ui/keyboard.py)
RequestShutdown¶
Publisher: KeyboardListener
Subscribers: Orchestrator, UIManager
Purpose: Graceful shutdown (finish active jobs)
Location: vbc/domain/events.py (published by vbc/ui/keyboard.py)
InterruptRequested¶
Publisher: KeyboardListener
Subscribers: Orchestrator, UIManager
Purpose: Immediate interrupt (Ctrl+C)
Location: vbc/domain/events.py (published by vbc/ui/keyboard.py)
UI Events¶
Deprecated UI Events
ToggleConfig, ToggleLegend, ToggleMenu, and HideConfig are deprecated and replaced by the new tabbed overlay system. They remain in the codebase for backwards compatibility but will be removed in a future version.
ToggleOverlayTab¶
class ToggleOverlayTab(Event):
tab: Optional[str] = None # "settings" | "io" | "dirs" | "reference" | "shortcuts" | "tui" | "logs" | None
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Toggle overlay with optional tab selection
Location: vbc/ui/keyboard.py
CycleOverlayTab¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Cycle through overlay tabs with Tab
Location: vbc/ui/keyboard.py
CycleLogsPage¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Navigate paginated entries in Logs tab
Location: vbc/ui/keyboard.py
CycleOverlayDim¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Cycle overlay dim levels from the TUI controls
Location: vbc/ui/keyboard.py
RotateGpuMetric¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Rotate the GPU metric displayed in the top bar
Location: vbc/ui/keyboard.py
CycleSparklinePreset¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Cycle GPU sparkline rendering presets
Location: vbc/ui/keyboard.py
CycleSparklinePalette¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Cycle GPU sparkline color palettes
Location: vbc/ui/keyboard.py
CloseOverlay¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: Close the overlay (Esc key)
Location: vbc/ui/keyboard.py
ToggleConfig (Deprecated)¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: ~~Show/hide configuration overlay~~ (replaced by ToggleOverlayTab)
Location: vbc/ui/keyboard.py
ToggleLegend (Deprecated)¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: ~~Show/hide legend overlay~~ (replaced by ToggleOverlayTab(tab="reference"))
Location: vbc/ui/keyboard.py
ToggleMenu (Deprecated)¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: ~~Show/hide menu overlay~~ (replaced by ToggleOverlayTab(tab="shortcuts"))
Location: vbc/ui/keyboard.py
HideConfig (Deprecated)¶
Publisher: KeyboardListener
Subscribers: UIManager
Purpose: ~~Close configuration overlay~~ (replaced by CloseOverlay)
Location: vbc/ui/keyboard.py
ActionMessage¶
Publisher: Orchestrator, KeyboardListener Subscribers: UIManager Purpose: User feedback messages (displayed for 60s)
Completion Events¶
ProcessingFinished¶
Publisher: Orchestrator Subscribers: UIManager Purpose: All jobs completed normally
ProcessingPausedOnError¶
Publisher: Orchestrator Subscribers: UIManager Purpose: Pause processing after a verification failure and show ERROR state
WaitingForInput¶
Publisher: Orchestrator
Subscribers: UIManager
Purpose: Enter WAITING state (R restart / S exit)
Dirs Tab Events¶
DirsCursorMove¶
DirsSwapSelected¶
DirsToggleSelected¶
DirsEnterAddMode¶
DirsMarkDelete¶
DirsInputChar¶
DirsConfirmAdd¶
DirsCancelInput¶
DirsApplyChanges¶
Event Flow Examples¶
Job Lifecycle¶
┌─────────────┐
│Orchestrator │
└──────┬──────┘
│
│ 1. Submit job to ThreadPoolExecutor
▼
┌────────────────┐
│ _process_file()│
└───────┬────────┘
│ 2. Create CompressionJob
│
│ publish(JobStarted(job))
▼
┌─────────┐ subscribe ┌───────────┐
│EventBus │ ──────────────────> │ UIManager │
└────┬────┘ └─────┬─────┘
│ │
│ │ 3. add_active_job()
│ ▼
│ ┌─────────┐
│ │UIState │
│ └─────────┘
│
│ 4. ffmpeg.compress()
▼
┌────────────┐
│FFmpegAdapter│
└──────┬─────┘
│
│ 5. Success → publish(JobCompleted(job))
│ Failure → publish(JobFailed(job, error))
│ HW Cap → publish(HardwareCapabilityExceeded(job))
▼
┌─────────┐
│EventBus │ ──> UIManager → UIState
└─────────┘
Keyboard Control¶
┌──────┐
│ User │
└──┬───┘
│ Press '>'
▼
┌────────────────┐
│KeyboardListener│ (daemon thread)
└───────┬────────┘
│
│ publish(ThreadControlEvent(change=+1))
▼
┌─────────┐
│EventBus │
└────┬────┘
│
├──> Orchestrator._on_thread_control()
│ └─> self._current_max_threads += 1
│ notify_all() → wake waiting threads
│
└──> UIManager.on_thread_control()
└─> state.current_threads += 1
Refresh Flow¶
┌──────┐
│ User │ Press 'R'
└──┬───┘
│
▼
┌────────────────┐
│KeyboardListener│
└───────┬────────┘
│ publish(RefreshRequested())
│ publish(ActionMessage("REFRESH requested"))
▼
┌─────────┐
│EventBus │
└────┬────┘
│
├──> Orchestrator._on_refresh_request()
│ └─> _refresh_requested = True
│
└──> UIManager.on_action_message()
└─> state.set_last_action("REFRESH requested")
(Later, in Orchestrator main loop)
┌────────────────┐
│ Orchestrator │
└───────┬────────┘
│ Check _refresh_requested
│
│ Re-scan directory
▼
_perform_discovery()
│
│ publish(DiscoveryFinished(...))
│ publish(ActionMessage("Refreshed: +5 new files"))
▼
┌─────────┐
│EventBus │ ──> UIManager ──> UIState
└─────────┘
Benefits¶
Loose Coupling¶
Components don't know about each other:
# Orchestrator doesn't know about UIManager
orchestrator.run(input_dir)
# Just publishes events
# UIManager doesn't know about Orchestrator
ui_manager = UIManager(bus, state)
# Just subscribes to events
Benefit: Can replace/remove components without changing others.
Testability¶
Easy to mock EventBus:
def test_orchestrator():
bus = Mock()
orchestrator = Orchestrator(..., event_bus=bus)
orchestrator.run(test_dir)
# Verify events published
bus.publish.assert_any_call(DiscoveryStarted(...))
bus.publish.assert_any_call(JobStarted(...))
Extensibility¶
Add new subscribers without modifying publishers:
# Add webhook notifier (no changes to Orchestrator)
class WebhookNotifier:
def __init__(self, bus, url):
self.url = url
bus.subscribe(JobCompleted, self.on_job_completed)
def on_job_completed(self, event):
requests.post(self.url, json={"job": event.job.dict()})
# Just instantiate
webhook = WebhookNotifier(bus, "https://example.com/webhook")
Debugging¶
Enable event logging:
class EventLogger:
def __init__(self, bus):
# Subscribe to ALL events
for event_type in Event.__subclasses__():
bus.subscribe(event_type, self.log)
def log(self, event):
print(f"[EVENT] {type(event).__name__}: {event}")
logger = EventLogger(bus)
Trade-offs¶
Synchronous Execution¶
Pro: Simple, predictable order Con: Slow handlers block publisher
Mitigation: Keep handlers fast (just update state, no I/O)
Error Handling¶
Issue: Exception in handler crashes publisher
Solution: Wrap publish in try/except:
def publish(self, event: Event):
for handler in self._subscribers.get(type(event), []):
try:
handler(event)
except Exception as e:
logger.error(f"Event handler failed: {e}")
Type Safety¶
Pro: Pydantic validates event data Con: Runtime overhead (minimal)
Benefit: Catch bugs early (e.g., missing fields, wrong types)
Best Practices¶
- One event per action: Don't combine unrelated state changes
- Treat events as value objects: Avoid mutating event fields after publish (recommended convention, not enforced by Pydantic config)
- Descriptive names:
JobCompleted>Event1 - Minimal data: Only include necessary fields
- Document purpose: Add docstrings to event classes
Next Steps¶
- Pipeline Flow - Job processing walkthrough
- Architecture Overview - High-level design