Skip to content

feat(message_bus): add QUIC, TCP-TLS, WS, WSS transports for SDK clients#3192

Open
hubcio wants to merge 9 commits intomasterfrom
feat/message-bus-multi-transport
Open

feat(message_bus): add QUIC, TCP-TLS, WS, WSS transports for SDK clients#3192
hubcio wants to merge 9 commits intomasterfrom
feat/message-bus-multi-transport

Conversation

@hubcio
Copy link
Copy Markdown
Contributor

@hubcio hubcio commented Apr 28, 2026

Replica plane stays TCP forever: VSR FIFO + view-change timing,
fd-delegation, writev batching all rely on plaintext between trusted
replicas. SDK-client plane gains four transports alongside TCP:

  • QUIC: shard-0 terminal (compio-quic CID demux), 1 bidi stream per
    peer, 0-RTT off + listener defense-in-depth reject.
  • TCP-TLS: rustls 1.3, no client auth, 0-RTT off, compio-tls behind
    unified TransportConn::run with bounded close_grace shutdown.
  • WS: compio-ws over plaintext TCP; pre-upgrade fd cross-shard
    handover keeps fd-delegation on plain TCP only.
  • WSS: WebSocketStream over TlsStream; both handshakes run on the
    per-connection install task.

Shared: TransportListener / TransportConn trait family; WebSocketConfig

  • close_grace threaded through MessageBusConfig and applied uniformly
    across TCP-TLS, WS, WSS; bounded safe-shutdown (no select! over
    stream.shutdown); single-task pump per WS/WSS using compio-ws
    cancel-safe read. Bus auth thin: both planes connect unauthenticated;
    server-ng gates via LOGIN_USER / LOGIN_WITH_PAT and future
    LOGIN_REPLICA. Ping announces replica_id only; no subprotocol, no
    ALPN, no MAC. Per-connection metadata flows via
    IggyMessageBus::client_meta; ShardFramePayload setup variants carry
    ClientConnMeta end to end.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 28, 2026

Codecov Report

❌ Patch coverage is 87.21609% with 394 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.35%. Comparing base (611fca0) to head (0ef2afc).

Files with missing lines Patch % Lines
core/message_bus/src/replica/io.rs 82.91% 41 Missing and 7 partials ⚠️
core/message_bus/src/transports/wss.rs 86.47% 38 Missing and 5 partials ⚠️
core/configs/src/server_ng_config/websocket.rs 0.00% 34 Missing ⚠️
core/message_bus/src/transports/ws.rs 87.08% 28 Missing and 3 partials ⚠️
core/message_bus/src/transports/quic.rs 91.66% 22 Missing and 5 partials ⚠️
core/configs/src/server_ng_config/displays.rs 0.00% 24 Missing ⚠️
core/message_bus/src/installer/replica.rs 85.44% 19 Missing and 4 partials ⚠️
core/message_bus/src/transports/tcp_tls.rs 94.93% 15 Missing and 7 partials ⚠️
core/message_bus/src/installer/mod.rs 52.63% 18 Missing ⚠️
core/shard/src/coordinator.rs 76.62% 18 Missing ⚠️
... and 18 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3192      +/-   ##
============================================
+ Coverage     74.10%   74.35%   +0.25%     
  Complexity      943      943              
============================================
  Files          1159     1177      +18     
  Lines        102033   104519    +2486     
  Branches      79083    81586    +2503     
============================================
+ Hits          75607    77715    +2108     
- Misses        23765    24061     +296     
- Partials       2661     2743      +82     
Components Coverage Δ
Rust Core 75.64% <87.21%> (+0.31%) ⬆️
Java SDK 60.14% <ø> (ø)
C# SDK 69.07% <ø> (-0.31%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.53% <ø> (ø)
Go SDK 39.43% <ø> (ø)
Files with missing lines Coverage Δ
core/binary_protocol/src/consensus/iobuf.rs 35.20% <100.00%> (+2.37%) ⬆️
core/configs/src/server_ng_config/defaults.rs 100.00% <100.00%> (ø)
core/configs/src/server_ng_config/server_ng.rs 40.98% <ø> (-2.77%) ⬇️
core/message_bus/src/connector.rs 94.31% <100.00%> (+1.46%) ⬆️
core/message_bus/src/installer/conn_info.rs 100.00% <100.00%> (ø)
core/message_bus/src/installer/quic.rs 100.00% <100.00%> (ø)
core/message_bus/src/installer/ws.rs 100.00% <100.00%> (ø)
core/message_bus/src/lifecycle/shutdown.rs 99.09% <100.00%> (+0.54%) ⬆️
core/message_bus/src/socket_opts.rs 100.00% <ø> (ø)
core/server-ng/src/session_manager.rs 90.14% <ø> (-0.50%) ⬇️
... and 30 more

... and 28 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@atharvalade atharvalade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found these during first round of review... I'll continue to review later. Overall seems good

if body.len() < iggy_binary_protocol::HEADER_SIZE {
return Err(FrameDecodeError::BadHeader);
}
let total_size = u32::from_le_bytes(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

framing.rs has a const _: () = { assert!(offset_of!(GenericHeader, size) == 48) } guard but this function duplicates the same 48..52 magic without one. If GenericHeader layout ever shifts, TCP/QUIC get a compile error while WS/WSS silently read the wrong bytes.

in_tx,
rx,
shutdown,
label,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_pump drops max_message_size into .. and decode_consensus_frame hardcodes framing::MAX_MESSAGE_SIZE. TCP and QUIC paths honor the per-bus config value, so an operator who lowers max_message_size gets enforcement on TCP/QUIC but not WS/WSS.

Comment thread core/server-ng/src/dedup.rs Outdated
.per_client
.entry(client)
.or_insert_with(|| PerClient::with_capacity(self.per_client_capacity));
if state.find(request).is_some() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookup treats TTL-expired Done entries as Fresh, but mark_in_flight calls find() without a TTL check. A client retrying after TTL expiry sees Fresh from lookup then gets false from mark_in_flight because the physical slot still exists. The retry is silently dropped.

let (server_out, server_in, server_shutdown, server_handle) = drive(server_conn);
let (client_out, client_in, client_shutdown, client_handle) = drive(client_conn);

client_out
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TCP-TLS's drive_close calls tls.shutdown() which sends close_notify, but WSS just does ws.close() + drop. The peer's rustls sees an unexpected EOF on the record layer, which can trigger false-positive alerts in TLS-aware load balancers or WAFs sitting in front.

@hubcio hubcio force-pushed the feat/message-bus-multi-transport branch from c732238 to 54b5b8d Compare April 30, 2026 07:15
The message_bus client plane only spoke plaintext TCP, so SDK clients
behind TLS-only middleboxes or browser bridges had no way in, and the
bus could not interop with the rest of the server's transports.

Adds four new client listeners (QUIC, TCP-TLS, WS, WSS), each with a
bind/listen path, a per-connection installer, and a cancel-safe pump.
Every transport caps inbound peers with a bounded handshake_grace
(default 10s) so slowloris peers cannot pin install slots; WSS shares
one wall-clock budget across the TLS and WS handshakes. The QUIC
accept loop spawns per-incoming so a slow handshake no longer wedges
the listener. The replica listener gets the same bound: each accepted
stream runs its handshake on a spawned task under
compio::time::timeout, mirroring the client-plane defense, and
in-flight handshake handles are reaped opportunistically so the
handle vector stays bounded for the listener's lifetime.

MessageBusConfig now consumes the [message_bus] block of the server-ng
config schema directly via From<&ServerNgConfig>. IggyDuration ->
Duration, IggyByteSize -> usize, and the WS frame schema ->
tungstenite::WebSocketConfig conversions happen at the boot boundary
so hot paths read pre-converted fields. WS frame-layer tunables
(max_message_size, max_frame_size, write_buffer_size,
accept_unmasked_frames) live under [message_bus] so SDK-client burst
characteristics are tuned independently of the legacy [websocket]
listener. Forks tcp/websocket/quic into server_ng_config so server-ng
can evolve its TLS/WS/QUIC surface without disturbing the legacy
server.

Hardening that came up while writing the new paths:

- TLS reads are cancel-safe. The previous pump ran framing's
  read_message inside select_biased!; when the mailbox arm won
  mid-frame, the future dropped its scratch buffer while rustls had
  already advanced its plaintext queue past those bytes, and the
  next iteration parsed garbage as a fresh header. Replaced with a
  resumable read_step that issues exactly one tls.read.await per
  call, with the framing accumulator owned by a TlsPumpState on the
  pump's stack frame and shared via SharedAcc
  (Rc<UnsafeCell<Owned<MESSAGE_ALIGN>>>; !Send + !Sync; single
  shard, single mut-borrower invariant enforced by debug_asserts).
  The WS analogue is upstream-blocked on compio_ws 0.4 split() and
  is documented as a known limitation.
- compio::runtime::spawn silently swallows panics, which left a
  panicking on_message / on_request handler with an orphan registry
  slot. Replica + TCP dispatch tasks install scopeguards that
  synchronously evict the registry slot, fire
  notify_connection_lost (replica), and remove the client_meta
  (TCP). Async drain on clean exit observes the slot gone and
  becomes a no-op.
- Writer-task panic on TCP / QUIC previously left the reader parked
  on a live socket. Each writer task now installs a scopeguard that
  triggers the per-connection Shutdown on exit; the watchdog
  observes the conn-side fire, calls libc::shutdown(SHUT_RD), and
  the reader unparks. Two regression tests cover the eviction chain
  via a synthetic WriterPanicConn.
- TCP-TLS, WS, and WSS writers batch by draining the mailbox up to
  max_batch via try_recv after the first message, write each frame,
  then emit a single flush. Mirrors the plain-TCP writev pattern
  and avoids paying one TLS record + one flush per queued frame on
  a sustained burst.
- QUIC writer uses send.write_all(frozen) instead of send.write,
  so frame boundaries are not corrupted on backpressure. Drops the
  per-frame flush; quinn-proto coalesces STREAM frames and
  explicit flush serialized every send.
- Cluster coordinator snapshot rebroadcasts Clear frames for
  forgotten replica slots via a 3-state MappingSlot enum, so a
  peer that missed a clear-frame on a full inbox eventually
  re-syncs.
- Per-connection FusedShutdown folds bus-token and conn-token into
  one channel, dropping the bridge task that used to run per
  install.
- send_to_* fast path no longer clones the Frozen payload on the
  registry-hit branch; Bytes::from_owner removes a per-frame Vec
  alloc and memcpy on WS/WSS outbound.

Hygiene: tungstenite unified at 0.28 across the workspace (was 0.28
and 0.29 side by side); AcceptedQuicConn newtype owns the QUIC
accept callback shape so compio-quic types stay private; WebSocketConfig
is re-exported as message_bus::WebSocketConfig so callers do not need
a direct compio_ws dep; install_client_<transport> /
install_replica_tcp[_fd] renamed for symmetry; ShardFramePayload,
SendError, and the ClientConnMeta / conn_info types are
#[non_exhaustive].
@hubcio hubcio force-pushed the feat/message-bus-multi-transport branch from 75ab18b to c9f5aeb Compare April 30, 2026 13:07
hubcio added 8 commits April 30, 2026 16:06
…section

c9f5aeb removed the BLAKE3-keyed MAC + per-peer nonce ring that
gated inbound replica handshakes (~900 LOC under auth/) without
mention in the commit body. The promised LOGIN_REPLICA mitigation
does not yet exist anywhere in the tree. Drop a TODO at the listener
auth rustdoc so the gap is visible until the replacement lands.
Four rustdoc blocks claimed TCP_NODELAY + SO_KEEPALIVE apply to
accepted client sockets, but socket_opts::apply_nodelay_for_connection
sets only TCP_NODELAY. Replica<->replica liveness is observed by VSR
heartbeats and SDK clients manage their own keepalive policy at the
application layer, so omission is intentional, not a bug. Strip the
SO_KEEPALIVE claim and reference the schema rationale at
configs/src/server_ng_config/message_bus.rs:49-52 instead.
Reuse a single Vec<BusMessage> across iterations of the per-connection
pump in tcp_tls, ws, and wss. Each PumpAction::Send arm previously
allocated a fresh batch with capacity max_batch, then consumed it in a
for-in loop and dropped the allocation on every drain. Mirror the
plain-tcp writer (which already hoists via mem::take) by allocating
once outside the loop and clearing via drain(..). Eliminates one
heap allocation per drain on TLS-family transports, no semantic
change.
The TODO at the replica listener auth section and the SO_KEEPALIVE
clarifications in the four client-install paths cited paths with line
ranges and a session-local finding label. Both rot on the next
refactor and mean nothing to a future reader. Replace with
intra-doc links to crate::socket_opts and a TODO(hubcio): owner
prefix; fold the rationale into prose so the comment stands alone.
drive_close ran tls.flush() to completion before entering
compio::time::timeout(close_grace, tls.shutdown()), so a peer that
refused to drain ciphertext could stretch the close phase past the
configured grace. Wrap flush + shutdown in one timeout so a stalled
flush counts against the same wall-clock budget as the shutdown
step. Mirrors the pattern already used by the wss close path.
Hoisting the writer batch outside the pump loop in tcp_tls/ws/wss
trips clippy::iter_with_drain (nursery, warn-by-default for the
crate). Its suggestion to switch to into_iter() would move the Vec
out and defeat the hoist, since the allocation must survive the
iteration to be reused. Allow the lint at each call site with a
brief note on why drain(..) is the right call here.
Three small doc fixes surfaced in review:

- transports/mod.rs: rename "Batch atomicity" to "Batch ordering" and
  spell out that writev is not atomic; the invariant is FIFO + tear
  down on short/failed write. Mirror the same wording in the
  TransportConn::run trait doc.
- transports/quic.rs: rustdoc reader_task with the cancel-safety
  hazard (select! drops a parked framing::read_message and loses
  bytes already pulled into the in-flight Owned).
- transports/ws.rs: comment at the WS-payload copy now states why
  MESSAGE_ALIGN exists at all (io_uring O_DIRECT alignment in the
  storage path) instead of leaving the reader to infer it from the
  Message<GenericHeader> invariant alone.

No behaviour change.
The bus's QUIC transport_config used to hardcode max_idle_timeout,
keep_alive_interval, send/receive windows, and stream caps, while
[`server_ng_config::QuicConfig`] carried the matching schema fields
unconsumed.

Wire those knobs through:

- New `QuicTuning` runtime substruct on `MessageBusConfig`,
  pre-converted from `cfg.quic` in `From<&ServerNgConfig>` (mirrors
  the existing pattern for the WebSocket frame-layer tunables).
- New `transports::quic::transport_config_from(&QuicTuning)` that
  applies the operator-tunable knobs (windows, idle timeout,
  keep-alive, initial MTU, datagram send buffer) and clamps the
  architectural invariants (max_concurrent_uni_streams = 0, CUBIC
  congestion). Replaces `default_transport_config()`.
- `server_config_with_cert` now takes `&QuicTuning`. The replica
  bootstrap and the integration test thread it from
  `bus.config().quic` / `QuicTuning::default()` respectively.
- `core/server-ng/config.toml` `[quic]` defaults retuned to match the
  bus's previous hardcoded values (1 bidi stream, 64 MiB windows,
  30 s idle, 10 s keep-alive). The legacy server's QUIC defaults
  stay where they are; this section is server-ng only.

Zero `Duration` for `keep_alive_interval` / `max_idle_timeout` is
treated as "disabled" so the conversion never feeds quinn an
`IdleTimeout(0 ms)` that would tear every connection down.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants