Livedata Dashboard Architecture#
Overview#
The Livedata dashboard is a real-time data visualization system built on Panel/HoloViews. It consumes processed detector data from Kafka, displays it as interactive plots, and publishes user commands back to backend services. The architecture supports multiple concurrent browser sessions with shared state and per-session rendering.
Data updates arrive at ~1 Hz. User controls (workflow start/stop, plot configuration) result in commands published to Kafka for backend consumption.
System Context#
flowchart TD
ECDCTopic(["ECDC Topics"])
DataTopic(["Livedata Data Topic"])
StatusTopic(["Livedata Status Topic"])
ResponsesTopic(["Livedata Responses Topic"])
CommandsTopic(["Livedata Commands Topic"])
subgraph BackendServices["Livedata Backend Services"]
MonitorData["monitor_data"]
DetectorData["detector_data"]
DataReduction["data_reduction"]
end
DashboardApp["Dashboard"]
ECDCTopic --> BackendServices
MonitorData -- publishes --> DataTopic
DetectorData -- publishes --> DataTopic
DataReduction -- publishes --> DataTopic
DataReduction -- publishes --> StatusTopic
DataReduction -- publishes --> ResponsesTopic
DataTopic --> DashboardApp
StatusTopic --> DashboardApp
ResponsesTopic --> DashboardApp
DashboardApp -- publishes --> CommandsTopic
CommandsTopic --> DataReduction
classDef kafka fill:#fff3e0,stroke:#ef6c00,color:#e65100;
classDef backend fill:#e3f2fd,stroke:#1976d2,color:#0d47a1;
classDef dashboard fill:#ede7f6,stroke:#7b1fa2,color:#4a148c;
class DataTopic,StatusTopic,ResponsesTopic,CommandsTopic,ECDCTopic kafka;
class MonitorData,DetectorData,DataReduction backend;
class DashboardApp dashboard;
The dashboard publishes job commands (start/stop workflows) to the Commands topic and receives acknowledgements via the Responses topic. Job and service status updates arrive via the Status topic.
Layered Architecture#
graph TD
subgraph "Infrastructure Layer"
T["Transport<br>(Kafka / Null)"]
MS["MessageSource"]
CS["CommandService"]
end
subgraph "Application Layer"
O["Orchestrator"]
DS["DataService"]
JO["JobOrchestrator"]
AJR["ActiveJobRegistry"]
PO["PlotOrchestrator"]
PC["PlottingController"]
SM["StreamManager"]
WC["WorkflowController"]
end
subgraph "Presentation Layer"
PGT["PlotGridTabs"]
WSW["WorkflowStatusWidget"]
CW["ConfigurationWidget"]
Plots["Plots<br>(HoloViews)"]
end
T --> MS
MS --> O
O --> DS
O --> AJR
CS --> T
JO --> CS
WC --> JO
PO --> PC
PC --> SM
SM --> DS
PO --> JO
PGT --> PO
WSW --> JO
CW --> WC
DS -.->|notifies| Plots
classDef infra fill:#e3f2fd,stroke:#1976d2,color:#0d47a1;
classDef app fill:#ede7f6,stroke:#7b1fa2,color:#4a148c;
classDef presentation fill:#e8f5e9,stroke:#388e3c,color:#1b5e20;
class T,MS,CS infra;
class O,DS,JO,AJR,PO,PC,SM,WC app;
class PGT,WSW,CW,Plots presentation;
Infrastructure: Transport abstraction (Kafka or Null for testing), message sources and sinks
Application: Data management, workflow lifecycle, plot orchestration
Presentation: Panel widgets and HoloViews plots
Service Composition#
DashboardServices (see dashboard/dashboard_services.py) wires all components together. It is the central composition root, created once per dashboard process, and shared across all browser sessions.
flowchart TD
DS[DashboardServices]
DS --> Transport
DS --> Orchestrator
DS --> DataService
DS --> JobOrchestrator
DS --> PlotOrchestrator
DS --> WorkflowController
DS --> CommandService
DS --> ActiveJobRegistry
DS --> SessionRegistry
DS --> PlotDataService
DS --> NotificationQueue
DashboardBase (see dashboard/dashboard.py) is the entry point. It creates DashboardServices, starts the Panel server, and creates per-session layouts via create_layout().
Data Flow#
sequenceDiagram
participant K as Kafka
participant O as Orchestrator
participant AJR as ActiveJobRegistry
participant DS as DataService
participant Sub as Plot Subscribers
participant UI as HoloViews Plots
K->>O: Raw messages (polled in background thread)
O->>AJR: ingestion_guard()
Note over O,AJR: Serializes against job stop/cleanup
O->>O: Filter by active job number
O->>DS: transaction { store values }
DS->>Sub: Notify subscribers (batched)
Sub->>UI: Update plot data (via Pipe)
The Orchestrator (see dashboard/orchestrator.py) is the message pump. It consumes from the MessageSource, filters messages by active job numbers, and stores data in DataService. Status messages and command acknowledgements are routed to JobOrchestrator.
DataService (see dashboard/data_service.py) is a dict-like store keyed by ResultKey. Subscribers register interest in specific keys and receive batched notifications via a transaction mechanism.
Workflow Lifecycle#
JobOrchestrator (see dashboard/job_orchestrator.py) manages the full lifecycle of workflow jobs using a two-phase commit pattern:
stateDiagram-v2
[*] --> Staging: stage_config()
Staging --> Staging: stage_config() (more sources)
Staging --> Committed: commit_workflow()
Committed --> [*]: stop_workflow()
Committed --> Committed: Receives acknowledgement
note right of Staging
Per-source configs staged
in memory. No Kafka commands
sent yet.
end note
note right of Committed
Commands sent to backend.
PendingCommandTracker awaits
acknowledgements.
end note
Key responsibilities:
Staging: Collects per-source configurations before committing. Subscribers are notified of staging changes for UI preview.
Commit: Sends
JobCommandmessages viaCommandService, assigns job numbers, activates jobs inActiveJobRegistry.Stop: Sends stop commands, deactivates jobs (which cleans up
DataServicebuffers viaActiveJobRegistry).Acknowledgement processing: Tracks pending commands and processes backend responses.
WorkflowController (see dashboard/workflow_controller.py) is the interface between widgets and JobOrchestrator. It translates Pydantic model parameters into orchestrator calls and creates WorkflowConfigurationAdapter instances for the configuration UI.
Plot Orchestration#
PlotOrchestrator (see dashboard/plot_orchestrator.py) manages the plot grid lifecycle:
Creates and removes plot grids (tab-level containers)
Manages plot cells within grids (add, remove, configure)
Subscribes to
JobOrchestratorworkflow events to create plots when jobs startPersists grid configurations via
ConfigStoreLoads grid templates for instrument-specific default layouts
PlottingController (see dashboard/plotting_controller.py) handles the mechanics of plot creation: finding compatible plotters for a given data shape, setting up data pipelines via StreamManager, and creating plotter instances.
PlotDataService holds per-plot shared state (Presenters with dirty flags) that is read by per-session SessionPlotManager instances during periodic updates.
Threading Model#
flowchart LR
subgraph "Background Thread"
UL["Update Loop<br>(DashboardServices)"]
UL --> O["Orchestrator.update()"]
UL --> SC["SessionRegistry.cleanup_stale_sessions()"]
end
subgraph "Per-Session Periodic Callback<br>(Tornado IOLoop)"
SU["SessionUpdater.periodic_update()"]
SU --> HB["Browser heartbeat check"]
SU --> NQ["Poll NotificationQueue"]
SU --> CH["Custom handlers<br>(plot pipe updates,<br>status widget refresh)"]
end
O -.->|writes| DS["DataService<br>(shared state)"]
CH -.->|reads| DS
Two threading contexts exist:
Background thread (
orchestrator-update): RunsOrchestrator.update()in a loop at ~5 Hz. Consumes Kafka messages and writes toDataService. UsesActiveJobRegistry.ingestion_guard()to serialize against UI-thread job cleanup.Per-session Tornado callbacks: Each browser session has a
SessionUpdaterthat runs in the Tornado IOLoop at ~1 Hz. It batches all UI mutations insidepn.io.hold()+doc.models.freeze()to minimize Bokeh model recomputation.
Session Management#
flowchart TD
Browser1["Browser Session A"] --> SU1["SessionUpdater A"]
Browser2["Browser Session B"] --> SU2["SessionUpdater B"]
SU1 --> SR["SessionRegistry"]
SU2 --> SR
SR --> Cleanup["Stale session cleanup<br>(heartbeat timeout)"]
Each browser session creates a SessionUpdater (see dashboard/session_updater.py) which:
Registers with
SessionRegistryfor lifecycle trackingEmbeds an invisible
HeartbeatWidgetthat sends browser-side heartbeats via JavaScriptRuns custom handlers (plot updates, status widgets) in the correct session/document context
Batches all UI operations using
pn.io.hold()+doc.models.freeze()
SessionRegistry (see dashboard/session_registry.py) tracks active sessions with heartbeat-based stale detection. Sessions are cleaned up when pn.state.on_session_destroyed() fires, or after a heartbeat timeout (defense-in-depth for browser crashes).
Configuration Adapters#
Configuration widgets use the ConfigurationAdapter pattern (see dashboard/configuration_adapter.py):
ConfigurationAdapteris an abstract base providing: title, description, model class for parameters, available source names, aux source definitions, and a start actionWorkflowConfigurationAdapterimplements this for workflow start dialogsPlotConfigurationAdapterimplements this for plot configuration modalsConfigurationStatepersists parameter choices across sessions viaConfigStore
The generic ConfigurationWidget (see widgets/configuration_widget.py) renders any adapter into a Panel form with source selection, parameter inputs, and a start button.
Transport Abstraction#
The Transport protocol (see dashboard/transport.py) abstracts message infrastructure:
DashboardKafkaTransport(seedashboard/kafka_transport.py): Connects to Kafka, providesMessageSource(consumer) andMessageSinkinstances (for commands and ROI updates)NullTransport: No-op implementation for testing
Both return DashboardResources containing a MessageSource, a command MessageSink, and an ROI MessageSink.
Key Widget Components#
Widgets live in dashboard/widgets/ and follow a pattern of receiving shared services in their constructor and registering periodic refresh handlers with SessionUpdater:
PlotGridTabs: Tab container managing multiple plot grids, workflow configuration, and system statusWorkflowStatusListWidget: Displays active workflow jobs and their statusSystemStatusWidget: Shows session count, backend worker status, heartbeat infoConfigurationWidget: Generic form builder driven byConfigurationAdapterPlotConfigModal: Modal dialog for configuring individual plot cells