Skip to content

Channels and Backpressure

TopoExec channels are bounded runtime contracts for every edge. They define what is stored, when a reader can observe it, what happens under overload, and which metrics explain degradation.

Modes

Mode Behavior Typical use
latest Keeps only the newest message. A newer publication overwrites the prior value and increments drop/overwrite health metrics. Low-latency sensor/frame path where stale work is worse than lost work.
queue / ring_buffer Keeps messages in FIFO order up to capacity; drop_oldest/overwrite discard oldest work when full. Commands or event streams that should preserve order within a bound.
latched Keeps the last message and lets late readers see it once. Configuration or snapshot-like boundary value.
previous_tick Publishes into a pending slot and exposes it only after advance_epoch(). Waiters are notified when the pending value becomes visible, not when it is first staged. Feedback that must be delayed by one epoch.
barrier Delivers only after capacity messages are queued. Small synchronization batches.

All modes have finite capacity. capacity <= 0 is invalid.

Overflow and health accounting

Overflow policy is explicit:

  • overwrite / drop_oldest: accept the new publication and replace older stored work where needed.
  • drop_newest: reject the new publication and preserve existing queued work.
  • reject / fail_fast: reject capacity overflow; fail_fast uses the channel-capacity error path.
  • block: reports would block producer in the current non-blocking runtime. It does not block an event-loop thread.

When one GraphContext::publish() fans out to several channels, or when a staged publication batch is committed, preflightable reject paths are all-or-nothing: the runtime validates target channels, payload copy policy, and capacity rejection before making any payload visible. Rejected commits still record reject/drop health and metric evidence on the rejecting channel.

Channel metrics distinguish common causes:

  • runtime.channel.drop_count: messages that did not enter a consumable path or expired before delivery.
  • runtime.channel.overwrite_count: stored work overwritten or oldest queued work discarded to accept newer work.
  • runtime.channel.reject_count: publications rejected or would-blocked by capacity policy.
  • runtime.channel.stale_drop_count: messages dropped because lifespan_ms expired before delivery.
  • runtime.channel.deadline_miss_count: delivered messages older than deadline_ms.
  • runtime.channel.health_event_count: aggregate health/degradation events from overwrite, reject, stale, or deadline paths.

RuntimeRunnerResult::channel_drop_count is the aggregate real-drop count. Use channel_overwrite_count, channel_reject_count, channel_stale_drop_count, and channel_deadline_miss_count, or the per-channel runtime.channel.* metrics, when explaining overwrite, rejection, stale expiry, or deadline degradation separately.

RuntimeChannelMetrics::degradation_reason stores the latest human-readable reason for a degradation path.

When runtime health events are enabled, channel overflow, stale drop, deadline miss, and queue high-watermark conditions are also recorded as bounded observer events. Each event carries channel_id/edge_id, overflow policy, reason, depth, capacity, and endpoint attributes. policy.emit_health_events: false suppresses these channel events for one edge without changing channel behavior or degradation counters.

Read semantics

Low-level readers expose the same semantics used by the runtime trigger engine:

  • peek_latest_for_component_port() returns the latest visible value without marking it delivered.
  • snapshot_for_component_port() copies the currently visible latest/queued messages without consuming them.
  • drain_for_component_port(..., max_batch) consumes a bounded batch and preserves remaining queued messages for later drains.
  • consume_for_component() drains all visible inputs for the component.
  • drain_for_reader(channel_id, reader_id, max_batch) is the explicit per-reader queue API.

For latest-style channels, component-port drains use the component.port cursor while component-wide drains use the component cursor. Embedders that need one stable low-level cursor should use the explicit reader APIs instead of mixing both helper families on the same channel.

Runtime channel messages carry invocation metadata (correlation_id, causation_id, epoch_id, transaction_id, source endpoint, and trigger kind) alongside payload and timing fields. This metadata flows through immediate, delay/state/async, task-completion, and CompositeLoop external commits; metrics avoid using it as default labels.

readers: single is the default. Queue-style single-reader drains remove delivered messages from the channel. readers: multi / readers: multiple keeps bounded queue storage and tracks a per-reader sequence cursor so each explicit reader can observe each retained queued message once. Because retained history is still bounded by channel capacity, a slow reader can miss messages dropped by overflow.

move_only payloads require readers: single; use shared_view or copy for multi-reader paths.

topoexec graph plan --format json and topoexec graph explain --format json expose each edge's readers, copy_policy, capacity, overflow, and slow_reader_drop_risk fields. The risk flag is true for multi-reader edges whose overflow policy can evict older retained history (drop_oldest or overwrite). topoexec graph lint reports slow_reader_drop_risk for those valid-but-risky edges and reports invalid_move_only_multireader when move_only is combined with non-single readers.

Lifespan and deadline

  • lifespan_ms drops stale messages before delivery or snapshot and increments stale_drop_count plus health_event_count.
  • deadline_ms still delivers the message, marks RuntimeChannelMessage::deadline_missed, increments deadline_miss_count, and records message age metrics.

The message received_at timestamp remains the publication/enqueue timestamp. Trigger policies such as batch windows depend on that invariant; delivery latency is reported separately through metrics.