| name | event-flow |
| description | Add new events to Bob The Skull's event-driven architecture. Use when creating new events, event publishers, event handlers, or extending the event system with new event types. |
| allowed-tools | Read, Edit, Grep, Glob |
Event Flow Management
Adds new events to Bob The Skull's event-driven architecture, ensuring all publishers and subscribers are properly connected.
When to Use
- Creating new event types
- Adding event publishers (components that emit events)
- Adding event subscribers (components that handle events)
- Documenting event flows
- Troubleshooting missing event handlers
Event-Driven Architecture Overview
Bob The Skull uses a central EventBus for all inter-component communication:
Component A → publish(Event) → EventBus → deliver → Component B.on_event()
No direct method calls between components! Everything goes through events.
Event Definition Pattern (events.py)
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, Optional
@dataclass
class YourEvent:
"""Published when [describe condition that triggers event]"""
param1: type # Description
param2: type # Description
optional_data: Optional[Dict[str, Any]] = None
timestamp: datetime = field(default_factory=datetime.now)
Common Event Types
State Change Events:
@dataclass
class ComponentStateChangedEvent:
"""Component changed state"""
component: str
old_state: str
new_state: str
timestamp: datetime = field(default_factory=datetime.now)
Detection Events:
@dataclass
class ThingDetectedEvent:
"""Thing was detected"""
confidence: float
location: tuple
detector_id: str
data: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
Request/Response Events:
@dataclass
class ProcessRequestEvent:
"""Request to process something"""
request_id: str
input_data: Any
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ProcessResponseEvent:
"""Response to process request"""
request_id: str
result: Any
success: bool
timestamp: datetime = field(default_factory=datetime.now)
Publishing Events
In component code:
from events import YourEvent
class YourComponent:
def __init__(self, event_bus):
self.event_bus = event_bus
def do_something(self):
# When condition occurs, publish event
event = YourEvent(
param1=value1,
param2=value2
)
self.event_bus.publish(event)
logger.info(f"Published YourEvent: {event}")
Subscribing to Events
In component code:
from events import YourEvent
class YourComponent:
def __init__(self, event_bus):
self.event_bus = event_bus
# Subscribe to event during initialization
self.event_bus.subscribe(YourEvent, self.on_your_event)
def on_your_event(self, event: YourEvent):
"""Handle YourEvent"""
logger.info(f"Received YourEvent: {event}")
# Process event
self.do_something_with(event.param1, event.param2)
State Machine Integration
State machine is a special subscriber that coordinates system behavior:
# In state_machine/state_machine.py
def _setup_event_handlers(self):
"""Subscribe to events"""
self.event_bus.subscribe(YourEvent, self.on_your_event)
def on_your_event(self, event: YourEvent):
"""Handle YourEvent in state machine"""
logger.info(f"State machine received: {event}")
# Check current state
if self.current_state == State.WAITING:
# Transition to new state
self._transition_to(State.PROCESSING)
# Trigger actions
self._handle_processing(event)
Event Flow Checklist
When adding a new event:
- Define event dataclass in
events.py - Find all components that should publish this event
- Find all components that should subscribe to this event
- Add publishers:
event_bus.publish(YourEvent(...)) - Add subscribers:
event_bus.subscribe(YourEvent, self.on_event) - Add handlers:
def on_your_event(self, event: YourEvent): - Update state machine if needed
- Test event flow end-to-end
- Document event flow in comments/docstrings
Finding Event Usage
Find where event is published:
grep -r "YourEvent(" .
Find where event is subscribed:
grep -r "subscribe.*YourEvent" .
Find event handlers:
grep -r "def on_your_event" .
Common Event Patterns
Vision Detection Events
- Published by: Detector plugins
- Subscribed by: State machine, vision processor
- Pattern:
*DetectedEvent,*ClearedEvent
Speech Events
- Published by: STT, wake word detector
- Subscribed by: State machine
- Pattern:
*RecognizedEvent,*DetectedEvent
LLM Events
- Published by: LLM processor
- Subscribed by: State machine, TTS
- Pattern:
*ResponseEvent,*ToolCallEvent
Hardware Events
- Published by: Hardware controllers
- Subscribed by: State machine
- Pattern:
*StatusEvent,*ErrorEvent
Event Bus Implementation
Bob uses two event bus implementations:
LocalEventBus - In-process pub/sub
from local_event_bus import LocalEventBus
event_bus = LocalEventBus()
MQTTEventBus - Distributed pub/sub
from mqtt_event_bus import MQTTEventBus
event_bus = MQTTEventBus(broker="192.168.1.44")
Both implement the same interface:
publish(event)- Send eventsubscribe(event_type, handler)- Register handlerunsubscribe(event_type, handler)- Remove handler
Existing Events Reference
Core events in events.py:
WakeWordDetectedEvent- Wake word heardSpeechRecognizedEvent- Speech transcribedLLMResponseReadyEvent- LLM response availableLLMToolCallEvent- LLM wants to call toolTTSStartedEvent/TTSCompletedEvent- TTS lifecycleFaceDetectedEvent/FaceIdentifiedEvent- Vision eventsMotionDetectedEvent/MotionClearedEvent- Motion eventsStateChangedEvent- State machine transitions
Event Naming Conventions
Event Names:
- Use descriptive nouns:
ThingDetected, notDetectThing - End with
Event:FaceDetectedEvent - Use past tense:
Detected, notDetect
Handler Names:
- Prefix with
on_:on_face_detected - Use snake_case:
on_speech_recognized - Match event name:
FaceDetectedEvent→on_face_detected
Debugging Event Flows
Enable event logging:
# In LocalEventBus or MQTTEventBus
logger.setLevel(logging.DEBUG)
Trace event flow:
# In event_bus.py
def publish(self, event):
logger.debug(f"Publishing: {type(event).__name__} to {len(self.subscribers.get(type(event), []))} subscribers")
# ... publish logic
Check subscriptions:
# In component
logger.info(f"Subscribed to: {[t.__name__ for t in self.event_bus.subscribers.keys()]}")
Pro Tips
- Events are immutable - Use dataclasses with frozen=False for mutable data
- Include context - Always add detector_id, component_id, etc.
- Timestamp everything - Use
field(default_factory=datetime.now) - Document triggers - Explain in docstring what causes the event
- Test isolation - Mock event_bus for unit tests
- Avoid event storms - Debounce high-frequency events
- Use type hints - Makes IDE autocomplete work perfectly