RateAwareMessageBatcher

Per-stream rate estimation · slot-based completion · gap- and fault-tolerant
detector (gated) monitor (gated) non-gated slot-gate close timeout close overflow

Core mechanics

StreamPeriodEstimator

inter-arrival diffs (gap = missed pulse) 1. median(diffs) → seed 2. diff / k (k = round(diff/seed), k ≥ 1) 3. median(per-pulse) · snap to int Hz

Ring-buffered diffs (N=32). Robust to missed pulses and split messages; negative diffs (out-of-order) filtered.

PulseGrid

origin period slot N-1 origin_ns · period_ns · slots_per_batch pulse_index = round((t - origin) / period)

Fixed grid, created once per stream. Jitter tolerance = period/2 via rounding. Preserved across batch-length changes.

Slot gate

close batch window

Close when a message lands in the last slot — not when N messages arrive. Missing pulses don't block; splits don't trick.

HWM & timeout

threshold active batch HWM cap: 3 · batch_length

Logical clock = max observed timestamp. Timeout closes at 1.2 · batch_length. New HWM clamped toward start + 3·batch_length (old value preserved for monotonicity) — self-heals in a few closes; one far-future outlier can't DoS.

Scenarios

Happy path — 14 Hz slot-gate close

Steady stream; every batch closes exactly on the last slot.

det 14Hz 0 1s 2s 3s

Missing pulse — still closes

Pulse 7 never arrives. Pulse 14 lands in last slot → close (count-based would hang).

det 14Hz slot 6 missing close @ slot 13 0 1s

Split messages — no premature close

Duplicate timestamp at slot 2 doesn't inflate slot count; last-slot check is what matters.

det partial duplicate ts max_slot = 5, last = 13 → gate unsatisfied (waiting) 0 1s

Overflow — future pulse re-routed

Early arrival past the last slot is queued and routed into the next batch after close. Only gridded gated streams enter _overflow; ungridded streams (non-gated or sub-Hz gated) past window.end take the separate hold-back path (see below).

det 14Hz overflow (stashed) → routed into next window 0 1s 2s

Multi-stream — waits for all gated streams

Batch closes only when every gated stream has filled its last slot.

det 14Hz mon 5Hz ✓ slot 13 ✓ slot 4 close (both satisfied) 0 1s

Phase offset — grid aligns to pulses, not window

Stream starts 0.04 s into the window. PulseGrid origin picks a real pulse; all 14 slots fit.

det 14Hz window start ↑ grid origin 0 1s

Gap recovery — window jumps past silence

No messages for 5 batches. Instead of emitting 5 empty placeholders, window advances to where traffic resumes.

det 14Hz silence — window jumps 0 1s 6s 7s

Timeout fallback — HWM past threshold

Only partial traffic this window, but a later non-gated message advances HWM past 1.2·batch_length → close.

det log (incomplete; slot gate open) HWM > threshold 1.2 · batch_length 0 1.5s

Non-gated ride-along — clean closure with log traffic

Log messages bypass the slot grid — no rate estimation, no slot routing. Msgs with ts ≤ window.end join the active batch; msgs ahead of the window are held for the next one (same path sub-Hz gated streams take — see below). Det’s slot gate closes each batch normally.

det 14Hz log log never gates — slot gate closes from det alone 0 1s 2s 3s

Sub-Hz stream — non-gating ride-along

0.5 Hz monitor: rate too low to build a grid. No slot routing, no overflow stashing; msgs with ts ≤ window.end join the active batch and msgs ahead of the window are held for the next one (same path as non-gated — see below). Det’s slot gate closes every batch.

det 14Hz mon 0.5Hz det slot-gate closes every batch; mon has no grid 0 1s 2s 3s 4s

Future ungridded — held for next window

Ungridded msg (non-gated, or sub-Hz gated) with ts > window.end sits in _future and re-routes at close, so a future-ts reading doesn’t mis-correlate with this batch’s detector data. Capped at 3·batch_length ahead: beyond that the ts is implausibly far and the msg falls through to the active batch — a bad epoch can’t cache messages indefinitely.

det log ts ≤ end · current batch arrives · held delivered on close 0 1s 2s

Broken stream — disjoint epoch rejected

Bifrost regression: monitor with ~43 year offset. Grid origin too far → refused; good stream closes as normal.

good broken ts = received_now − 43 years · non-gating · still delivered

HWM outlier clamp — single bad timestamp can't DoS

One message ~1 year ahead used to pin HWM → millions of empty closures. Clamped to start + 3·batch_length; self-heals.

log ts ≈ now + 1 year naive HWM → here ↴ HWM cap = start + 3·batch

Stream lifecycle — join, absent, evict, rejoin

New stream gates after MIN_DIFFS_FOR_GATE convergence. Absent for 5 batches → evicted. Can re-join later.

det mon join gating absent → counter++ evicted (5 absent) re-joins later

Batch-length change — grid recomputes

set_batch_length(2s) applies at next close. Grid origin preserved; slots_per_batch updates (14 → 28).

det 14Hz slots=14 slots=28 set_batch_length(2.0) 0 1s 3s

Jitter tolerance — period/2 slot width

Slots have ±period/2 jitter budget via round(). At ~30 ms (42% of period at 14 Hz) batch continuity holds, with occasional ±1 slot drift.

det ~14Hz each pulse rounded to its nearest slot 0 1s
Invariants: no ungated stream blocks closure · ungridded msgs (non-gated or sub-Hz gated) ahead of the window are held (up to 3·batch_length) and flow into the next batch; beyond that cap they fall through to the active batch · gated stream must fill last slot OR be evicted/demoted · new HWM clamped toward start + 3·batch_length (self-heals within a few closes) · grid origin preserved across batch-length changes when plausible.
Contract departures from SimpleMessageBatcher: long silences emit no placeholder batches (gap-advance jumps the window) · slots_per_batch=1 edge case can emit an empty batch when a stray overflows — message is delivered next window, never lost.