feat(message_bus): add QUIC, TCP-TLS, WS, WSS transports for SDK clients#3192
feat(message_bus): add QUIC, TCP-TLS, WS, WSS transports for SDK clients#3192
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3192 +/- ##
============================================
+ Coverage 74.10% 74.35% +0.25%
Complexity 943 943
============================================
Files 1159 1177 +18
Lines 102033 104519 +2486
Branches 79083 81586 +2503
============================================
+ Hits 75607 77715 +2108
- Misses 23765 24061 +296
- Partials 2661 2743 +82
🚀 New features to boost your workflow:
|
atharvalade
left a comment
There was a problem hiding this comment.
found these during first round of review... I'll continue to review later. Overall seems good
| if body.len() < iggy_binary_protocol::HEADER_SIZE { | ||
| return Err(FrameDecodeError::BadHeader); | ||
| } | ||
| let total_size = u32::from_le_bytes( |
There was a problem hiding this comment.
framing.rs has a const _: () = { assert!(offset_of!(GenericHeader, size) == 48) } guard but this function duplicates the same 48..52 magic without one. If GenericHeader layout ever shifts, TCP/QUIC get a compile error while WS/WSS silently read the wrong bytes.
| in_tx, | ||
| rx, | ||
| shutdown, | ||
| label, |
There was a problem hiding this comment.
run_pump drops max_message_size into .. and decode_consensus_frame hardcodes framing::MAX_MESSAGE_SIZE. TCP and QUIC paths honor the per-bus config value, so an operator who lowers max_message_size gets enforcement on TCP/QUIC but not WS/WSS.
| .per_client | ||
| .entry(client) | ||
| .or_insert_with(|| PerClient::with_capacity(self.per_client_capacity)); | ||
| if state.find(request).is_some() { |
There was a problem hiding this comment.
lookup treats TTL-expired Done entries as Fresh, but mark_in_flight calls find() without a TTL check. A client retrying after TTL expiry sees Fresh from lookup then gets false from mark_in_flight because the physical slot still exists. The retry is silently dropped.
| let (server_out, server_in, server_shutdown, server_handle) = drive(server_conn); | ||
| let (client_out, client_in, client_shutdown, client_handle) = drive(client_conn); | ||
|
|
||
| client_out |
There was a problem hiding this comment.
TCP-TLS's drive_close calls tls.shutdown() which sends close_notify, but WSS just does ws.close() + drop. The peer's rustls sees an unexpected EOF on the record layer, which can trigger false-positive alerts in TLS-aware load balancers or WAFs sitting in front.
c732238 to
54b5b8d
Compare
The message_bus client plane only spoke plaintext TCP, so SDK clients behind TLS-only middleboxes or browser bridges had no way in, and the bus could not interop with the rest of the server's transports. Adds four new client listeners (QUIC, TCP-TLS, WS, WSS), each with a bind/listen path, a per-connection installer, and a cancel-safe pump. Every transport caps inbound peers with a bounded handshake_grace (default 10s) so slowloris peers cannot pin install slots; WSS shares one wall-clock budget across the TLS and WS handshakes. The QUIC accept loop spawns per-incoming so a slow handshake no longer wedges the listener. The replica listener gets the same bound: each accepted stream runs its handshake on a spawned task under compio::time::timeout, mirroring the client-plane defense, and in-flight handshake handles are reaped opportunistically so the handle vector stays bounded for the listener's lifetime. MessageBusConfig now consumes the [message_bus] block of the server-ng config schema directly via From<&ServerNgConfig>. IggyDuration -> Duration, IggyByteSize -> usize, and the WS frame schema -> tungstenite::WebSocketConfig conversions happen at the boot boundary so hot paths read pre-converted fields. WS frame-layer tunables (max_message_size, max_frame_size, write_buffer_size, accept_unmasked_frames) live under [message_bus] so SDK-client burst characteristics are tuned independently of the legacy [websocket] listener. Forks tcp/websocket/quic into server_ng_config so server-ng can evolve its TLS/WS/QUIC surface without disturbing the legacy server. Hardening that came up while writing the new paths: - TLS reads are cancel-safe. The previous pump ran framing's read_message inside select_biased!; when the mailbox arm won mid-frame, the future dropped its scratch buffer while rustls had already advanced its plaintext queue past those bytes, and the next iteration parsed garbage as a fresh header. Replaced with a resumable read_step that issues exactly one tls.read.await per call, with the framing accumulator owned by a TlsPumpState on the pump's stack frame and shared via SharedAcc (Rc<UnsafeCell<Owned<MESSAGE_ALIGN>>>; !Send + !Sync; single shard, single mut-borrower invariant enforced by debug_asserts). The WS analogue is upstream-blocked on compio_ws 0.4 split() and is documented as a known limitation. - compio::runtime::spawn silently swallows panics, which left a panicking on_message / on_request handler with an orphan registry slot. Replica + TCP dispatch tasks install scopeguards that synchronously evict the registry slot, fire notify_connection_lost (replica), and remove the client_meta (TCP). Async drain on clean exit observes the slot gone and becomes a no-op. - Writer-task panic on TCP / QUIC previously left the reader parked on a live socket. Each writer task now installs a scopeguard that triggers the per-connection Shutdown on exit; the watchdog observes the conn-side fire, calls libc::shutdown(SHUT_RD), and the reader unparks. Two regression tests cover the eviction chain via a synthetic WriterPanicConn. - TCP-TLS, WS, and WSS writers batch by draining the mailbox up to max_batch via try_recv after the first message, write each frame, then emit a single flush. Mirrors the plain-TCP writev pattern and avoids paying one TLS record + one flush per queued frame on a sustained burst. - QUIC writer uses send.write_all(frozen) instead of send.write, so frame boundaries are not corrupted on backpressure. Drops the per-frame flush; quinn-proto coalesces STREAM frames and explicit flush serialized every send. - Cluster coordinator snapshot rebroadcasts Clear frames for forgotten replica slots via a 3-state MappingSlot enum, so a peer that missed a clear-frame on a full inbox eventually re-syncs. - Per-connection FusedShutdown folds bus-token and conn-token into one channel, dropping the bridge task that used to run per install. - send_to_* fast path no longer clones the Frozen payload on the registry-hit branch; Bytes::from_owner removes a per-frame Vec alloc and memcpy on WS/WSS outbound. Hygiene: tungstenite unified at 0.28 across the workspace (was 0.28 and 0.29 side by side); AcceptedQuicConn newtype owns the QUIC accept callback shape so compio-quic types stay private; WebSocketConfig is re-exported as message_bus::WebSocketConfig so callers do not need a direct compio_ws dep; install_client_<transport> / install_replica_tcp[_fd] renamed for symmetry; ShardFramePayload, SendError, and the ClientConnMeta / conn_info types are #[non_exhaustive].
75ab18b to
c9f5aeb
Compare
…section c9f5aeb removed the BLAKE3-keyed MAC + per-peer nonce ring that gated inbound replica handshakes (~900 LOC under auth/) without mention in the commit body. The promised LOGIN_REPLICA mitigation does not yet exist anywhere in the tree. Drop a TODO at the listener auth rustdoc so the gap is visible until the replacement lands.
Four rustdoc blocks claimed TCP_NODELAY + SO_KEEPALIVE apply to accepted client sockets, but socket_opts::apply_nodelay_for_connection sets only TCP_NODELAY. Replica<->replica liveness is observed by VSR heartbeats and SDK clients manage their own keepalive policy at the application layer, so omission is intentional, not a bug. Strip the SO_KEEPALIVE claim and reference the schema rationale at configs/src/server_ng_config/message_bus.rs:49-52 instead.
Reuse a single Vec<BusMessage> across iterations of the per-connection pump in tcp_tls, ws, and wss. Each PumpAction::Send arm previously allocated a fresh batch with capacity max_batch, then consumed it in a for-in loop and dropped the allocation on every drain. Mirror the plain-tcp writer (which already hoists via mem::take) by allocating once outside the loop and clearing via drain(..). Eliminates one heap allocation per drain on TLS-family transports, no semantic change.
The TODO at the replica listener auth section and the SO_KEEPALIVE clarifications in the four client-install paths cited paths with line ranges and a session-local finding label. Both rot on the next refactor and mean nothing to a future reader. Replace with intra-doc links to crate::socket_opts and a TODO(hubcio): owner prefix; fold the rationale into prose so the comment stands alone.
drive_close ran tls.flush() to completion before entering compio::time::timeout(close_grace, tls.shutdown()), so a peer that refused to drain ciphertext could stretch the close phase past the configured grace. Wrap flush + shutdown in one timeout so a stalled flush counts against the same wall-clock budget as the shutdown step. Mirrors the pattern already used by the wss close path.
Hoisting the writer batch outside the pump loop in tcp_tls/ws/wss trips clippy::iter_with_drain (nursery, warn-by-default for the crate). Its suggestion to switch to into_iter() would move the Vec out and defeat the hoist, since the allocation must survive the iteration to be reused. Allow the lint at each call site with a brief note on why drain(..) is the right call here.
Three small doc fixes surfaced in review: - transports/mod.rs: rename "Batch atomicity" to "Batch ordering" and spell out that writev is not atomic; the invariant is FIFO + tear down on short/failed write. Mirror the same wording in the TransportConn::run trait doc. - transports/quic.rs: rustdoc reader_task with the cancel-safety hazard (select! drops a parked framing::read_message and loses bytes already pulled into the in-flight Owned). - transports/ws.rs: comment at the WS-payload copy now states why MESSAGE_ALIGN exists at all (io_uring O_DIRECT alignment in the storage path) instead of leaving the reader to infer it from the Message<GenericHeader> invariant alone. No behaviour change.
The bus's QUIC transport_config used to hardcode max_idle_timeout, keep_alive_interval, send/receive windows, and stream caps, while [`server_ng_config::QuicConfig`] carried the matching schema fields unconsumed. Wire those knobs through: - New `QuicTuning` runtime substruct on `MessageBusConfig`, pre-converted from `cfg.quic` in `From<&ServerNgConfig>` (mirrors the existing pattern for the WebSocket frame-layer tunables). - New `transports::quic::transport_config_from(&QuicTuning)` that applies the operator-tunable knobs (windows, idle timeout, keep-alive, initial MTU, datagram send buffer) and clamps the architectural invariants (max_concurrent_uni_streams = 0, CUBIC congestion). Replaces `default_transport_config()`. - `server_config_with_cert` now takes `&QuicTuning`. The replica bootstrap and the integration test thread it from `bus.config().quic` / `QuicTuning::default()` respectively. - `core/server-ng/config.toml` `[quic]` defaults retuned to match the bus's previous hardcoded values (1 bidi stream, 64 MiB windows, 30 s idle, 10 s keep-alive). The legacy server's QUIC defaults stay where they are; this section is server-ng only. Zero `Duration` for `keep_alive_interval` / `max_idle_timeout` is treated as "disabled" so the conversion never feeds quinn an `IdleTimeout(0 ms)` that would tear every connection down.
Replica plane stays TCP forever: VSR FIFO + view-change timing,
fd-delegation, writev batching all rely on plaintext between trusted
replicas. SDK-client plane gains four transports alongside TCP:
peer, 0-RTT off + listener defense-in-depth reject.
unified TransportConn::run with bounded close_grace shutdown.
handover keeps fd-delegation on plain TCP only.
per-connection install task.
Shared: TransportListener / TransportConn trait family; WebSocketConfig
across TCP-TLS, WS, WSS; bounded safe-shutdown (no select! over
stream.shutdown); single-task pump per WS/WSS using compio-ws
cancel-safe read. Bus auth thin: both planes connect unauthenticated;
server-ng gates via LOGIN_USER / LOGIN_WITH_PAT and future
LOGIN_REPLICA. Ping announces replica_id only; no subprotocol, no
ALPN, no MAC. Per-connection metadata flows via
IggyMessageBus::client_meta; ShardFramePayload setup variants carry
ClientConnMeta end to end.