diff --git a/.gitignore b/.gitignore index 0b8f1a405bda3a..69c1dd205316fa 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,7 @@ /tags /tags.* /doc/api.xml +/docs/ /node /node_g /gon-config.json diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index 6ca59469faf2de..4b58048ab9197a 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -3942,8 +3942,8 @@ class QuicEndpoint { const { retryTokenExpiration, tokenExpiration, - maxConnectionsPerHost = 0, - maxConnectionsTotal = 0, + maxConnectionsPerHost = 100, + maxConnectionsTotal = 10_000, maxStatelessResetsPerHost, disableStatelessReset, addressLRUSize, diff --git a/src/quic/application.cc b/src/quic/application.cc index b5d8c8609fa3dc..a44a60f4b48cc8 100644 --- a/src/quic/application.cc +++ b/src/quic/application.cc @@ -239,7 +239,8 @@ void Session::Application::ReceiveStreamReset(Stream* stream, // < 0 (other): fatal error, session already closed ssize_t Session::Application::TryWritePendingDatagram(PathStorage* path, uint8_t* dest, - size_t destlen) { + size_t destlen, + uint64_t ts) { CHECK(session_->HasPendingDatagrams()); auto max_attempts = session_->config().options.max_datagram_send_attempts; @@ -262,9 +263,12 @@ ssize_t Session::Application::TryWritePendingDatagram(PathStorage* path, int accepted = 0; int dg_flags = NGTCP2_WRITE_DATAGRAM_FLAG_MORE; + // PacketInfo for the datagram path. When libuv gains per-socket ECN + // marking, the value from ngtcp2 should be forwarded to the send path. + PacketInfo dg_pi; ssize_t dg_nwrite = ngtcp2_conn_writev_datagram(*session_, &path->path, - nullptr, + dg_pi, dest, destlen, &accepted, @@ -272,7 +276,7 @@ ssize_t Session::Application::TryWritePendingDatagram(PathStorage* path, dg.id, &dgvec, 1, - uv_hrtime()); + ts); if (accepted) { // Nice, the datagram was accepted! @@ -329,14 +333,42 @@ void Session::Application::SendPendingData() { if (!session().can_send_packets()) [[unlikely]] { return; } - static constexpr size_t kMaxPackets = 32; + // Upper bound on packets per SendPendingData call. ngtcp2's send quantum + // is typically 64 KB, which at 1200-byte minimum packet size is ~53 + // packets. 64 covers the worst case with headroom. The actual count per + // call is dynamically capped by ngtcp2_conn_get_send_quantum(). + static constexpr size_t kMaxPackets = 64; Debug(session_, "Application sending pending data"); + // Cache the timestamp once for the entire send loop. ngtcp2 does not + // require nanosecond-accurate monotonicity within a single burst — + // a single timestamp per SendPendingData call is what other QUIC + // implementations use (e.g., quiche, msquic). When kernel-level + // packet pacing becomes available via libuv, this timestamp becomes + // the base for computing per-packet transmit timestamps. + const uint64_t ts = uv_hrtime(); PathStorage path; StreamData stream_data; bool closed = false; + + // Batch accumulation: packets are collected here and flushed via + // Session::SendBatch when the loop exits, the batch is full, or + // on early return. This enables synchronous batched delivery via + // uv_udp_try_send2 (sendmmsg) from the deferred flush path. + Packet::Ptr batch[kMaxPackets]; + PathStorage batch_paths[kMaxPackets]; + size_t batch_count = 0; + + auto flush_batch = [&] { + if (batch_count == 0) return; + session_->SendBatch(batch, batch_paths, batch_count); + batch_count = 0; + }; + auto update_stats = OnScopeLeave([&] { if (closed) return; + // Flush any remaining accumulated packets before updating stats. + flush_batch(); auto& s = session(); if (!s.is_destroyed()) [[likely]] { s.UpdatePacketTxTime(); @@ -353,7 +385,7 @@ void Session::Application::SendPendingData() { kMaxPackets, ngtcp2_conn_get_send_quantum(*session_) / max_packet_size); if (max_packet_count == 0) return; - // The number of packets that have been sent in this call to SendPendingData. + // The number of packets that have been prepared in this call. size_t packet_send_count = 0; Packet::Ptr packet; @@ -368,6 +400,16 @@ void Session::Application::SendPendingData() { return true; }; + // Accumulate a completed packet into the batch. + auto enqueue_packet = + [&](Packet::Ptr& pkt, size_t len, const PacketInfo& pi) { + Debug(session_, "Enqueuing packet with %zu bytes into batch", len); + pkt->Truncate(len); + pkt->set_pkt_info(pi); + path.CopyTo(&batch_paths[batch_count]); + batch[batch_count++] = std::move(pkt); + }; + // We're going to enter a loop here to prepare and send no more than // max_packet_count packets. for (;;) { @@ -405,8 +447,14 @@ void Session::Application::SendPendingData() { } // Awesome, let's write our packet! - ssize_t nwrite = WriteVStream( - &path, packet->data(), &ndatalen, packet->length(), stream_data); + PacketInfo pi; + ssize_t nwrite = WriteVStream(&path, + &pi, + packet->data(), + &ndatalen, + packet->length(), + stream_data, + ts); // When ndatalen is > 0, that's our indication that stream data was accepted // in to the packet. Yay! @@ -493,7 +541,7 @@ void Session::Application::SendPendingData() { // if there is one. Otherwise just loop around and keep going. if (session_->HasPendingDatagrams()) { auto result = TryWritePendingDatagram( - &path, packet->data(), packet->length()); + &path, packet->data(), packet->length(), ts); // When result is 0, either the datagram was congestion controlled, // didn't fit in the packet, or was abandoned. Skip and continue. @@ -502,8 +550,7 @@ void Session::Application::SendPendingData() { if (result > 0) { size_t len = result; Debug(session_, "Sending packet with %zu bytes", len); - packet->Truncate(len); - session_->Send(std::move(packet), path); + enqueue_packet(packet, len, pi); if (++packet_send_count == max_packet_count) return; } else if (result < 0) { // Any negative result other than NGTCP2_ERR_WRITE_MORE @@ -540,8 +587,7 @@ void Session::Application::SendPendingData() { // is the size of the packet we are sending. size_t len = nwrite; Debug(session_, "Sending packet with %zu bytes", len); - packet->Truncate(len); - session_->Send(std::move(packet), path); + enqueue_packet(packet, len, pi); if (++packet_send_count == max_packet_count) return; // If there are pending datagrams, try sending them in a fresh packet. @@ -557,11 +603,10 @@ void Session::Application::SendPendingData() { return session_->Close(CloseMethod::SILENT); } auto result = - TryWritePendingDatagram(&path, packet->data(), packet->length()); + TryWritePendingDatagram(&path, packet->data(), packet->length(), ts); if (result > 0) { Debug(session_, "Sending datagram packet with %zd bytes", result); - packet->Truncate(static_cast(result)); - session_->Send(std::move(packet), path); + enqueue_packet(packet, static_cast(result), PacketInfo()); if (++packet_send_count == max_packet_count) return; } else if (result < 0 && result != NGTCP2_ERR_WRITE_MORE) { // Fatal error — session already closed by TryWritePendingDatagram. @@ -574,17 +619,21 @@ void Session::Application::SendPendingData() { } ssize_t Session::Application::WriteVStream(PathStorage* path, + PacketInfo* pi, uint8_t* dest, ssize_t* ndatalen, size_t max_packet_size, - const StreamData& stream_data) { + const StreamData& stream_data, + uint64_t ts) { DCHECK_LE(stream_data.count, kMaxVectorCount); uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE; if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; + // The PacketInfo out-param is populated by ngtcp2 with the ECN codepoint + // to apply when sending this packet. When libuv gains per-socket ECN + // marking, the value should be forwarded to the send path. return ngtcp2_conn_writev_stream(*session_, &path->path, - // TODO(@jasnell): ECN blocked on libuv - nullptr, + *pi, dest, max_packet_size, ndatalen, @@ -592,7 +641,7 @@ ssize_t Session::Application::WriteVStream(PathStorage* path, stream_data.id, stream_data, stream_data.count, - uv_hrtime()); + ts); } // ============================================================================ diff --git a/src/quic/application.h b/src/quic/application.h index 673a4000e4ba2d..59583b941b95b4 100644 --- a/src/quic/application.h +++ b/src/quic/application.h @@ -267,14 +267,19 @@ class Session::Application : public MemoryRetainer { // the datagram is either congestion limited or was abandoned ssize_t TryWritePendingDatagram(PathStorage* path, uint8_t* dest, - size_t destlen); + size_t destlen, + uint64_t ts); - // Write the given stream_data into the buffer. + // Write the given stream_data into the buffer. The PacketInfo out-param + // is populated by ngtcp2 with per-packet metadata (e.g., ECN codepoint) + // that should be applied when sending the packet. ssize_t WriteVStream(PathStorage* path, + PacketInfo* pi, uint8_t* buf, ssize_t* ndatalen, size_t max_packet_size, - const StreamData& stream_data); + const StreamData& stream_data, + uint64_t ts); Session* session_ = nullptr; }; diff --git a/src/quic/bindingdata.cc b/src/quic/bindingdata.cc index 4a3b3dba11f196..a2719d683c6e92 100644 --- a/src/quic/bindingdata.cc +++ b/src/quic/bindingdata.cc @@ -22,6 +22,7 @@ namespace node { using mem::kReserveSizeAndAlign; using v8::Function; using v8::FunctionTemplate; +using v8::HandleScope; using v8::Local; using v8::Object; using v8::String; @@ -154,6 +155,16 @@ BindingData& BindingData::Get(Environment* env) { BindingData::~BindingData() { quic_alloc_state.binding = nullptr; + if (flush_check_initialized_) { + uv_check_stop(&flush_check_); + flush_check_started_ = false; + // The check handle is closed inline here. Because BindingData destruction + // happens during Environment cleanup, the handle will be finalized by + // libuv's close phase. + uv_close(reinterpret_cast(&flush_check_), nullptr); + flush_check_initialized_ = false; + } + pending_flush_sessions_.clear(); } ngtcp2_mem* BindingData::ngtcp2_allocator() { @@ -221,6 +232,11 @@ void BindingData::RegisterExternalReferences( BindingData::BindingData(Realm* realm, Local object) : BaseObject(realm, object) { MakeWeak(); + CHECK_EQ(uv_check_init(env()->event_loop(), &flush_check_), 0); + flush_check_.data = this; + // Unref so the check handle doesn't keep the event loop alive on its own. + uv_unref(reinterpret_cast(&flush_check_)); + flush_check_initialized_ = true; } SessionManager& BindingData::session_manager() { @@ -230,6 +246,45 @@ SessionManager& BindingData::session_manager() { return *session_manager_; } +void BindingData::ScheduleSessionFlush(const BaseObjectPtr& session) { + pending_flush_sessions_.push_back(session); + if (!flush_check_started_) { + uv_check_start(&flush_check_, OnFlushCheck); + flush_check_started_ = true; + } +} + +void BindingData::OnFlushCheck(uv_check_t* handle) { + auto* binding = static_cast(handle->data); + if (binding->pending_flush_sessions_.empty()) { + uv_check_stop(&binding->flush_check_); + binding->flush_check_started_ = false; + return; + } + + HandleScope scope(binding->env()->isolate()); + + // Swap to a local vector before iterating. SendPendingData may trigger + // MakeCallback which runs JS that could cause more packet receives via + // re-entry (e.g., a stream data callback that synchronously writes to + // another session). Any sessions added during the flush remain in + // pending_flush_sessions_ and are picked up on the next check tick. + auto sessions = std::move(binding->pending_flush_sessions_); + for (auto& session : sessions) { + session->pending_flush_ = false; + if (!session->is_destroyed()) { + session->FlushPendingData(); + } + } + + // If no new sessions were added during the flush, stop the check + // to avoid per-tick callback overhead when idle. + if (binding->pending_flush_sessions_.empty()) { + uv_check_stop(&binding->flush_check_); + binding->flush_check_started_ = false; + } +} + void BindingData::MemoryInfo(MemoryTracker* tracker) const { #define V(name, _) tracker->TrackField(#name, name##_callback()); diff --git a/src/quic/bindingdata.h b/src/quic/bindingdata.h index cc3c3a49f5647a..b3965529a6c1de 100644 --- a/src/quic/bindingdata.h +++ b/src/quic/bindingdata.h @@ -10,9 +10,11 @@ #include #include #include +#include #include #include #include +#include #include "defs.h" namespace node::quic { @@ -201,6 +203,13 @@ class BindingData final // routing so that any endpoint can route packets to any session. SessionManager& session_manager(); + // Schedule a session for deferred SendPendingData. Sessions are accumulated + // during the I/O poll phase (via Endpoint::Receive -> Session::ReadPacket) + // and flushed in a uv_check callback immediately after poll completes. + // This batches multiple received packets before generating responses, + // allowing ngtcp2 to make better ACK coalescing decisions. + void ScheduleSessionFlush(const BaseObjectPtr& session); + std::unordered_map> listening_endpoints; size_t current_ngtcp2_memory_ = 0; @@ -247,6 +256,17 @@ class BindingData final #undef V std::unique_ptr session_manager_; + + // Deferred send flush state. The uv_check_t fires immediately after + // the I/O poll phase in the same event loop tick, allowing batched + // receive processing: all packets are read during poll, then + // SendPendingData is called once per dirty session in the check callback. + uv_check_t flush_check_; + std::vector> pending_flush_sessions_; + bool flush_check_started_ = false; + bool flush_check_initialized_ = false; + + static void OnFlushCheck(uv_check_t* handle); }; JS_METHOD_IMPL(IllegalConstructor); diff --git a/src/quic/data.h b/src/quic/data.h index 2b6d777caf7b81..ec8d40cbc4c7a0 100644 --- a/src/quic/data.h +++ b/src/quic/data.h @@ -19,6 +19,40 @@ namespace node::quic { template concept OneByteType = sizeof(T) == 1; +// Lightweight wrapper around ngtcp2_pkt_info. Insulates the Node.js QUIC +// code from the ngtcp2 struct layout and provides a clean API boundary +// for per-packet metadata (currently ECN codepoint; may grow as ngtcp2 +// and libuv evolve). +// +// Default-constructed PacketInfo is zero-initialized, which ngtcp2 treats +// as ECN Not-ECT — identical to passing nullptr for the pkt_info parameter. +class PacketInfo final { + public: + // ECN codepoints as defined by RFC 3168. + enum class Ecn : uint32_t { + NOT_ECT = 0, // Not ECN-Capable Transport + ECT_1 = 1, // ECN-Capable Transport(1) + ECT_0 = 2, // ECN-Capable Transport(0) + CE = 3, // Congestion Experienced + }; + + PacketInfo() : info_{} {} + explicit PacketInfo(const ngtcp2_pkt_info& info) : info_(info) {} + + // ECN codepoint for this packet. When libuv gains per-packet ECN + // reporting, populate via set_ecn() from the receive metadata + // before passing to ReadPacket(). + Ecn ecn() const { return static_cast(info_.ecn); } + void set_ecn(Ecn ecn) { info_.ecn = static_cast(ecn); } + + // Conversion operators for ngtcp2 API calls. + operator const ngtcp2_pkt_info*() const { return &info_; } + operator ngtcp2_pkt_info*() { return &info_; } + + private: + ngtcp2_pkt_info info_; +}; + struct Path final : public ngtcp2_path { explicit Path(const SocketAddress& local, const SocketAddress& remote); Path(Path&& other) noexcept = default; diff --git a/src/quic/endpoint.cc b/src/quic/endpoint.cc index 66413f66cafee2..8a61f51b088750 100644 --- a/src/quic/endpoint.cc +++ b/src/quic/endpoint.cc @@ -29,7 +29,6 @@ namespace node { using v8::Array; using v8::ArrayBufferView; -using v8::BackingStore; using v8::HandleScope; using v8::Integer; using v8::Just; @@ -312,10 +311,18 @@ class Endpoint::UDP::Impl final : public HandleWrap { SET_SELF_SIZE(Impl) private: + // Pre-allocated receive buffer. Reused across all datagrams because + // ngtcp2_conn_read_pkt is synchronous — it copies what it needs and + // does not retain a reference to the buffer after returning. This + // eliminates a malloc(64KB)/free(64KB) cycle per received datagram. + static constexpr size_t kRecvBufferSize = 65536; // UV__UDP_DGRAM_MAXSIZE + char recv_buf_[kRecvBufferSize]; + static void OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - *buf = From(handle)->env()->allocate_managed_buffer(suggested_size); + auto* impl = From(handle); + *buf = uv_buf_init(impl->recv_buf_, kRecvBufferSize); } static void OnReceive(uv_udp_t* handle, @@ -327,26 +334,22 @@ class Endpoint::UDP::Impl final : public HandleWrap { DCHECK_NOT_NULL(impl); DCHECK_NOT_NULL(impl->endpoint_); - auto release_buf = [&]() { - if (buf->base != nullptr) impl->env()->release_managed_buffer(*buf); - }; - // Nothing to do in these cases. Specifically, if the nread // is zero or we have received a partial packet, we are just - // going to ignore it. + // going to ignore it. No buffer release needed — recv_buf_ + // is pre-allocated and reused. if (nread == 0 || flags & UV_UDP_PARTIAL) { - release_buf(); return; } if (nread < 0) { - release_buf(); impl->endpoint_->Destroy(CloseContext::RECEIVE_FAILURE, static_cast(nread)); return; } - impl->endpoint_->Receive(uv_buf_init(buf->base, static_cast(nread)), + impl->endpoint_->Receive(reinterpret_cast(buf->base), + static_cast(nread), SocketAddress(addr)); } @@ -492,6 +495,24 @@ int Endpoint::UDP::Send(Packet::Ptr packet) { return err; } +int Endpoint::UDP::TrySend(const Packet::Ptr& packet) { + DCHECK(packet); + if (is_closed_or_closing()) return UV_EBADF; + uv_buf_t buf = *packet; + return uv_udp_try_send( + &impl_->handle_, &buf, 1, packet->destination().data()); +} + +int Endpoint::UDP::TrySendBatch(uv_buf_t* bufs[], + unsigned int nbufs[], + struct sockaddr* addrs[], + size_t count) { + DCHECK_GT(count, 0); + if (is_closed_or_closing()) return UV_EBADF; + return uv_udp_try_send2( + &impl_->handle_, static_cast(count), bufs, nbufs, addrs, 0); +} + void Endpoint::UDP::MemoryInfo(MemoryTracker* tracker) const { if (impl_) tracker->TrackField("impl", impl_); } @@ -812,6 +833,111 @@ void Endpoint::Send(Packet::Ptr packet) { STAT_INCREMENT(Stats, packets_sent); } +void Endpoint::SendOrTrySend(Packet::Ptr packet) { +#ifdef DEBUG + if (is_diagnostic_packet_loss(options_.tx_loss)) [[unlikely]] { + return; + } +#endif + + if (is_closed() || is_closing() || !packet || packet->length() == 0) { + return; + } + + Debug(this, "TrySend %s", packet->ToString()); + + // Attempt synchronous send. On success (returns number of bytes sent), + // the packet is delivered immediately — no callback overhead, no + // waiting for the next poll cycle. + int err = udp_.TrySend(packet); + if (err >= 0) { + // Synchronous send succeeded. + STAT_INCREMENT_N(Stats, bytes_sent, packet->length()); + STAT_INCREMENT(Stats, packets_sent); + // Ptr destructor releases back to arena pool. + return; + } + + if (err == UV_EAGAIN) { + // Socket not writable or async sends are queued. Fall back to the + // async path — the packet will be queued and flushed on the next + // POLLOUT cycle. + Debug(this, "TrySend got EAGAIN, falling back to async Send"); + return Send(std::move(packet)); + } + + // Other errors are fatal. + Debug(this, "TrySend failed with error %d", err); + Destroy(CloseContext::SEND_FAILURE, err); +} + +void Endpoint::SendBatch(Packet::Ptr* packets, size_t count) { + if (count == 0) return; + +#ifdef DEBUG + if (is_diagnostic_packet_loss(options_.tx_loss)) [[unlikely]] { + for (size_t i = 0; i < count; i++) packets[i].reset(); + return; + } +#endif + + if (is_closed() || is_closing()) { + for (size_t i = 0; i < count; i++) packets[i].reset(); + return; + } + + static constexpr size_t kMaxBatch = 64; + DCHECK_LE(count, kMaxBatch); + + // Build libuv argument arrays directly from the Ptr array. + // Packets with zero length are released and skipped. + uv_buf_t bufs[kMaxBatch]; + uv_buf_t* buf_ptrs[kMaxBatch]; + unsigned int nbufs[kMaxBatch]; + struct sockaddr* addrs[kMaxBatch]; + // Map from valid-index back to the original packets[] index. + size_t index_map[kMaxBatch]; + size_t valid_count = 0; + + for (size_t i = 0; i < count; i++) { + if (!packets[i] || packets[i]->length() == 0) { + packets[i].reset(); + continue; + } + bufs[valid_count] = *packets[i]; + buf_ptrs[valid_count] = &bufs[valid_count]; + nbufs[valid_count] = 1; + addrs[valid_count] = + const_cast(packets[i]->destination().data()); + index_map[valid_count] = i; + valid_count++; + } + + if (valid_count == 0) return; + + // Attempt synchronous batched send via sendmmsg. + int sent = udp_.TrySendBatch(buf_ptrs, nbufs, addrs, valid_count); + + if (sent > 0) { + // Packets [0, sent) were delivered synchronously. + // Release them immediately — no async callback needed. + for (size_t i = 0; i < static_cast(sent); i++) { + size_t idx = index_map[i]; + STAT_INCREMENT_N(Stats, bytes_sent, packets[idx]->length()); + STAT_INCREMENT(Stats, packets_sent); + packets[idx].reset(); + } + } + + // Any unsent packets (EAGAIN, partial send, or total failure) fall + // back to async uv_udp_send. + size_t start = (sent > 0) ? static_cast(sent) : 0; + for (size_t i = start; i < valid_count; i++) { + size_t idx = index_map[i]; + Send(std::move(packets[idx])); + } +} + void Endpoint::SendRetry(const PathDescriptor& options) { // Generating and sending retry packets does consume some system resources, // and it is possible for a malicious peer to trigger sending a large number @@ -840,22 +966,31 @@ void Endpoint::SendRetry(const PathDescriptor& options) { void Endpoint::SendVersionNegotiation(const PathDescriptor& options) { Debug(this, "Sending version negotiation on path %s", options); - // While creating and sending a version negotiation packet does consume a - // small amount of system resources, and while it is fairly trivial for a - // malicious peer to force a version negotiation to be sent, these are more - // trivial to create than the cryptographically generated retry and stateless - // reset packets. If the packet is sent, then we'll at least increment the - // version_negotiation_count statistic so that application code can keep an - // eye on it. + // A malicious peer can trivially force version negotiation packets by + // sending packets with unsupported QUIC versions, potentially from + // spoofed source addresses. Rate-limit per remote host to prevent + // amplification attacks. + const auto exceeds_limits = [&] { + SocketAddressInfoTraits::Type* counts = + addr_validation_lru_.Peek(options.remote_address); + auto count = counts != nullptr ? counts->version_negotiation_count : 0; + return count >= kMaxVersionNegotiations; + }; + + if (exceeds_limits()) { + Debug(this, + "Version negotiation rate limit exceeded for %s", + options.remote_address); + return; + } + auto packet = Packet::CreateVersionNegotiationPacket(*this, options); if (packet) { + addr_validation_lru_.Upsert(options.remote_address) + ->version_negotiation_count++; STAT_INCREMENT(Stats, version_negotiation_count); Send(std::move(packet)); } - - // If creating the packet is unsuccessful, we just drop things on the floor. - // It's not worth committing any further resources to this one packet. We - // might want to log the failure at some point tho. } bool Endpoint::SendStatelessReset(const PathDescriptor& options, @@ -902,11 +1037,28 @@ void Endpoint::SendImmediateConnectionClose(const PathDescriptor& options, "Sending immediate connection close on path %s with reason %s", options, reason); - // While it is possible for a malicious peer to cause us to create a large - // number of these, generating them is fairly trivial. + // A malicious peer can trigger immediate connection close packets by + // sending Initial packets with invalid tokens or when the server is + // busy. Rate-limit per remote host to prevent amplification attacks. + const auto exceeds_limits = [&] { + SocketAddressInfoTraits::Type* counts = + addr_validation_lru_.Peek(options.remote_address); + auto count = counts != nullptr ? counts->immediate_close_count : 0; + return count >= kMaxImmediateCloses; + }; + + if (exceeds_limits()) { + Debug(this, + "Immediate connection close rate limit exceeded for %s", + options.remote_address); + return; + } + auto packet = Packet::CreateImmediateConnectionClosePacket(*this, options, reason); if (packet) { + addr_validation_lru_.Upsert(options.remote_address) + ->immediate_close_count++; STAT_INCREMENT(Stats, immediate_close_count); Send(std::move(packet)); } @@ -1117,24 +1269,39 @@ void Endpoint::CloseGracefully() { MaybeDestroy(); } -void Endpoint::Receive(const uv_buf_t& buf, +void Endpoint::Receive(const uint8_t* data, + size_t len, const SocketAddress& remote_address) { const auto receive = [&](Session* session, - Store&& store, + const uint8_t* pkt_data, + size_t pkt_len, const SocketAddress& local_address, const SocketAddress& remote_address, const CID& dcid, const CID& scid) { DCHECK_NOT_NULL(session); if (session->is_destroyed()) return; - size_t len = store.length(); - if (session->Receive(std::move(store), local_address, remote_address)) { - STAT_INCREMENT_N(Stats, bytes_received, len); + // Use ReadPacket (no SendPendingDataScope) so that multiple packets + // received in the same I/O burst are processed before any responses + // are generated. The deferred flush via BindingData's uv_check + // callback calls SendPendingData once per dirty session after all + // packets in the burst have been read. + if (session->ReadPacket(pkt_data, pkt_len, local_address, remote_address)) { + STAT_INCREMENT_N(Stats, bytes_received, pkt_len); STAT_INCREMENT(Stats, packets_received); } + // Schedule the session for deferred SendPendingData if it hasn't + // been scheduled already in this burst. + if (!session->is_destroyed() && !session->pending_flush_) { + session->pending_flush_ = true; + BindingData::Get(env()).ScheduleSessionFlush( + BaseObjectPtr(session)); + } }; - const auto accept = [&](const Session::Config& config, Store&& store) { + const auto accept = [&](const Session::Config& config, + const uint8_t* pkt_data, + size_t pkt_len) { // One final check. If the endpoint is closed, closing, or is not listening // as a server, then we cannot accept the initial packet. if (is_closed() || is_closing() || !is_listening()) return; @@ -1164,7 +1331,8 @@ void Endpoint::Receive(const uv_buf_t& buf, return; receive(session.get(), - std::move(store), + pkt_data, + pkt_len, config.local_address, config.remote_address, config.dcid, @@ -1174,7 +1342,8 @@ void Endpoint::Receive(const uv_buf_t& buf, const auto acceptInitialPacket = [&](const uint32_t version, const CID& dcid, const CID& scid, - Store&& store, + const uint8_t* pkt_data, + size_t pkt_len, const SocketAddress& local_address, const SocketAddress& remote_address) { // If we're not listening as a server, do not accept an initial packet. @@ -1184,8 +1353,7 @@ void Endpoint::Receive(const uv_buf_t& buf, // This is our first condition check... A minimal check to see if ngtcp2 can // even recognize this packet as a quic packet. - ngtcp2_vec vec = store; - if (ngtcp2_accept(&hd, vec.base, vec.len) != NGTCP2_SUCCESS) { + if (ngtcp2_accept(&hd, pkt_data, pkt_len) != NGTCP2_SUCCESS) { // Per the ngtcp2 docs, ngtcp2_accept returns 0 if the check was // successful, or an error code if it was not. Currently there's only one // documented error code (NGTCP2_ERR_INVALID_ARGUMENT) but we'll handle @@ -1423,7 +1591,7 @@ void Endpoint::Receive(const uv_buf_t& buf, } } - accept(config, std::move(store)); + accept(config, pkt_data, pkt_len); }; // When a received packet contains a QUIC short header but cannot be matched @@ -1439,14 +1607,15 @@ void Endpoint::Receive(const uv_buf_t& buf, // possible to avoid a DOS vector. const auto maybeStatelessReset = [&](const CID& dcid, const CID& scid, - Store& store, + const uint8_t* pkt_data, + size_t pkt_len, const SocketAddress& local_address, const SocketAddress& remote_address) { // Support for stateless resets can be disabled by the application. If that // case, or if the packet is too short to contain a reset token, then we // skip the remaining checks. if (options_.disable_stateless_reset || - store.length() < NGTCP2_STATELESS_RESET_TOKENLEN) { + pkt_len < NGTCP2_STATELESS_RESET_TOKENLEN) { return false; } @@ -1454,20 +1623,21 @@ void Endpoint::Receive(const uv_buf_t& buf, // NGTCP2_STATELESS_RESET_TOKENLEN bytes in the received packet. If it is a // stateless reset then then rest of the bytes in the packet are garbage // that we'll ignore. - ngtcp2_vec vec = store; - vec.base += (vec.len - NGTCP2_STATELESS_RESET_TOKENLEN); + const uint8_t* token_pos = + pkt_data + (pkt_len - NGTCP2_STATELESS_RESET_TOKENLEN); // If a Session has been associated with the token, then it is a valid // stateless reset token. We need to dispatch it to the session to be // processed. auto* session = session_manager().FindSessionByStatelessResetToken( - StatelessResetToken(vec.base)); + StatelessResetToken(token_pos)); if (session != nullptr) { // If the session happens to have been destroyed already, we'll // just ignore the packet. if (!session->is_destroyed()) [[likely]] { receive(session, - std::move(store), + pkt_data, + pkt_len, local_address, remote_address, dcid, @@ -1495,22 +1665,8 @@ void Endpoint::Receive(const uv_buf_t& buf, // return; // } - Debug(this, "Received %zu-byte packet from %s", buf.len, remote_address); - - // The managed buffer here contains the received packet. We do not yet know - // at this point if it is a valid QUIC packet. We need to do some basic - // checks. It is critical at this point that we do as little work as possible - // to avoid a DOS vector. - std::shared_ptr backing = env()->release_managed_buffer(buf); - if (!backing) [[unlikely]] { - // At this point something bad happened and we need to treat this as a fatal - // case. There's likely no way to test this specific condition reliably. - return Destroy(CloseContext::RECEIVE_FAILURE, UV_ENOMEM); - } - - Store store(std::move(backing), buf.len, 0); + Debug(this, "Received %zu-byte packet from %s", len, remote_address); - ngtcp2_vec vec = store; ngtcp2_version_cid pversion_cid; // This is our first check to see if the received data can be processed as a @@ -1519,7 +1675,7 @@ void Endpoint::Receive(const uv_buf_t& buf, // valid QUIC header but there is still no guarantee that the packet can be // successfully processed. switch (ngtcp2_pkt_decode_version_cid( - &pversion_cid, vec.base, vec.len, NGTCP2_MAX_CIDLEN)) { + &pversion_cid, data, len, NGTCP2_MAX_CIDLEN)) { case 0: break; // Supported version, continue processing. case NGTCP2_ERR_VERSION_NEGOTIATION: { @@ -1597,7 +1753,7 @@ void Endpoint::Receive(const uv_buf_t& buf, // necessary here. We want to return immediately without committing any // further resources. if (pversion_cid.version == 0 && - maybeStatelessReset(dcid, scid, store, addr, remote_address)) { + maybeStatelessReset(dcid, scid, data, len, addr, remote_address)) { Debug(this, "Packet was a stateless reset"); return; // Stateless reset! Don't do any further processing. } @@ -1612,17 +1768,13 @@ void Endpoint::Receive(const uv_buf_t& buf, SendStatelessReset( PathDescriptor{ pversion_cid.version, dcid, scid, addr, remote_address}, - store.length()); + len); return; } // Process the packet as an initial packet... - return acceptInitialPacket(pversion_cid.version, - dcid, - scid, - std::move(store), - addr, - remote_address); + return acceptInitialPacket( + pversion_cid.version, dcid, scid, data, len, addr, remote_address); } if (session->is_destroyed()) [[unlikely]] { @@ -1634,7 +1786,7 @@ void Endpoint::Receive(const uv_buf_t& buf, // If we got here, the dcid matched the scid of a known local session. Yay! // The session will take over any further processing of the packet. Debug(this, "Dispatching packet to known session"); - receive(session.get(), std::move(store), addr, remote_address, dcid, scid); + receive(session.get(), data, len, addr, remote_address, dcid, scid); // It is important to note that the session may have been destroyed during // the call to receive(...). If that's the case, the session object still diff --git a/src/quic/endpoint.h b/src/quic/endpoint.h index b9f20f8659dfa6..a9f020e0328eff 100644 --- a/src/quic/endpoint.h +++ b/src/quic/endpoint.h @@ -47,6 +47,20 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { // intentionally triggering generation of a large number of retries. static constexpr uint64_t DEFAULT_MAX_RETRY_LIMIT = 10; + // Maximum number of version negotiation packets that will be sent to a + // given remote host within the LRU tracking window. Version negotiation + // packets are cheap to generate but can be used as an amplification + // vector with spoofed source addresses. + // TODO(@jasnell): Consider making this configurable via Endpoint::Options. + static constexpr uint64_t kMaxVersionNegotiations = 10; + + // Maximum number of immediate connection close packets that will be sent + // to a given remote host within the LRU tracking window. These are sent + // when the server is busy or a token is invalid — a malicious peer could + // trigger a large number of them. + // TODO(@jasnell): Consider making this configurable via Endpoint::Options. + static constexpr uint64_t kMaxImmediateCloses = 10; + // Endpoint configuration options struct Options final : public MemoryRetainer { // The local socket address to which the UDP port will be bound. The port @@ -208,6 +222,20 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { void Send(Packet::Ptr packet); + // Attempt synchronous send via uv_udp_try_send. If the socket is + // writable, the packet is sent immediately and the Ptr is released. + // If the socket is not writable (UV_EAGAIN), falls back to the + // async Send path. Used by the deferred flush callback to avoid + // the one-tick latency of async uv_udp_send. + void SendOrTrySend(Packet::Ptr packet); + + // Send a batch of packets using uv_udp_try_send2 (sendmmsg) for + // synchronous batched delivery. Packets successfully sent are released + // immediately. On EAGAIN or partial send, remaining packets fall back + // to async uv_udp_send. The Packet::Ptr array is consumed: all entries + // will be empty (released or moved) on return. + void SendBatch(Packet::Ptr* packets, size_t count); + // Acquire a Packet from the pool. length sets the initial working // size (must be <= pool capacity). The slot is always allocated at // full capacity to avoid fragmentation. @@ -281,6 +309,20 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { void Close(); int Send(Packet::Ptr packet); + // Synchronous send using uv_udp_try_send. Returns the number of + // bytes sent on success, UV_EAGAIN if the socket is not writable + // or the send queue is non-empty, or another negative error code. + // The Ptr is not consumed — the caller manages the lifecycle. + int TrySend(const Packet::Ptr& packet); + + // Synchronous batched send using uv_udp_try_send2 (sendmmsg). + // Takes pre-built libuv argument arrays. Returns the number of + // messages successfully sent (>= 0), or a negative error code. + int TrySendBatch(uv_buf_t* bufs[], + unsigned int nbufs[], + struct sockaddr* addrs[], + size_t count); + // Returns the local UDP socket address to which we are bound, // or fail with an assert if we are not bound. SocketAddress local_address() const; @@ -381,7 +423,7 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { // Ref() causes a listening Endpoint to keep the event loop active. JS_METHOD(Ref); - void Receive(const uv_buf_t& buf, const SocketAddress& from); + void Receive(const uint8_t* data, size_t len, const SocketAddress& from); AliasedStruct stats_; AliasedStruct state_; @@ -426,6 +468,8 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { struct Type final { size_t reset_count; size_t retry_count; + size_t version_negotiation_count; + size_t immediate_close_count; uint64_t timestamp; bool validated; }; diff --git a/src/quic/http3.cc b/src/quic/http3.cc index ea07c0a5a596fb..6717ac064801cb 100644 --- a/src/quic/http3.cc +++ b/src/quic/http3.cc @@ -262,11 +262,17 @@ class Http3ApplicationImpl final : public Session::Application { } void BeginShutdown() override { - if (conn_) nghttp3_conn_submit_shutdown_notice(*this); + // Only submit a shutdown notice if the H3 connection was fully + // started (control streams bound). If the TLS handshake failed + // before Start() was called, conn_ exists but its control streams + // are unbound, and nghttp3_conn_submit_shutdown_notice would crash. + if (conn_ && started_) nghttp3_conn_submit_shutdown_notice(*this); } void CompleteShutdown() override { - if (conn_) nghttp3_conn_shutdown(*this); + // Same guard as BeginShutdown — nghttp3_conn_shutdown asserts + // that the control stream is bound (conn->tx.ctrl != NULL). + if (conn_ && started_) nghttp3_conn_shutdown(*this); } bool ReceiveStreamData(stream_id id, diff --git a/src/quic/packet.h b/src/quic/packet.h index ffeb582471333f..a94ee1264c2a6a 100644 --- a/src/quic/packet.h +++ b/src/quic/packet.h @@ -68,6 +68,8 @@ class Packet final { size_t length() const { return length_; } size_t capacity() const { return capacity_; } const SocketAddress& destination() const { return destination_; } + const PacketInfo& pkt_info() const { return pkt_info_; } + void set_pkt_info(const PacketInfo& pi) { pkt_info_ = pi; } Listener* listener() const { return listener_; } // Redirect the packet to a different endpoint for cross-endpoint sends @@ -148,6 +150,7 @@ class Packet final { Listener* listener_; // Touched at send time. + PacketInfo pkt_info_; SocketAddress destination_; // Only touched by libuv during uv_udp_send and in the send callback. diff --git a/src/quic/session.cc b/src/quic/session.cc index 4af903e0c2a0af..abcf733d443cdd 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -464,7 +464,12 @@ Session::Config::Config(Environment* env, settings.log_printf = ngtcp2_debug_log; } - settings.handshake_timeout = options.handshake_timeout; + // The handshake_timeout option is in milliseconds; ngtcp2 expects + // nanoseconds (ngtcp2_duration). UINT64_MAX means no timeout. + settings.handshake_timeout = + options.handshake_timeout == UINT64_MAX + ? UINT64_MAX + : options.handshake_timeout * NGTCP2_MILLISECONDS; settings.max_stream_window = options.max_stream_window; settings.max_window = options.max_window; settings.ack_thresh = options.unacknowledged_packet_threshold; @@ -2101,22 +2106,35 @@ void Session::SetLastError(QuicError&& error) { impl_->last_error_ = std::move(error); } -bool Session::Receive(Store&& store, +bool Session::Receive(const uint8_t* data, + size_t len, const SocketAddress& local_address, - const SocketAddress& remote_address) { + const SocketAddress& remote_address, + const PacketInfo& pkt_info, + uint64_t ts) { + // Convenience wrapper: reads the packet and immediately triggers + // SendPendingData. Used by paths that need an immediate response + // (e.g., Endpoint::Connect for client Initial packets). + // The hot receive path uses ReadPacket() directly with deferred + // flush via BindingData's uv_check callback. + SendPendingDataScope send_scope(this); + return ReadPacket(data, len, local_address, remote_address, pkt_info, ts); +} + +bool Session::ReadPacket(const uint8_t* data, + size_t len, + const SocketAddress& local_address, + const SocketAddress& remote_address, + const PacketInfo& pkt_info, + uint64_t ts) { DCHECK(!is_destroyed()); impl_->remote_address_ = remote_address; - // When we are done processing this packet, we arrange to send any - // pending data for this session. - SendPendingDataScope send_scope(this); - - ngtcp2_vec vec = store; Path path(local_address, remote_address); Debug(this, "Session is receiving %zu-byte packet received along path %s", - vec.len, + len, path); // It is important to understand that reading the packet will cause @@ -2125,29 +2143,30 @@ bool Session::Receive(Store&& store, // ensures that any deferred destroy waits until all callbacks for this // packet have completed. After calling ngtcp2_conn_read_pkt here, we // will need to double check that the session is not destroyed before - // we try doing anything with it (like updating stats, sending pending - // data, etc). + // we try doing anything with it (like updating stats, etc). int err; { NgTcp2CallbackScope callback_scope(this); - err = ngtcp2_conn_read_pkt(*this, - &path, - // TODO(@jasnell): ECN pkt_info blocked on libuv - nullptr, - vec.base, - vec.len, - uv_hrtime()); + // The PacketInfo carries per-packet metadata (currently ECN codepoint). + // When libuv gains per-packet ECN reporting, the caller should + // populate pkt_info from the receive metadata before calling + // ReadPacket(). + // When ts is 0 (the default), call uv_hrtime() here. The batched + // receive path caches a timestamp and passes it to all ReadPacket() + // calls in the same I/O burst. + if (ts == 0) ts = uv_hrtime(); + err = ngtcp2_conn_read_pkt(*this, &path, pkt_info, data, len, ts); } if (is_destroyed()) return false; - Debug(this, "Session receiving %zu-byte packet with result %d", vec.len, err); + Debug(this, "Session receiving %zu-byte packet with result %d", len, err); switch (err) { case 0: { - Debug(this, "Session successfully received %zu-byte packet", vec.len); + Debug(this, "Session successfully received %zu-byte packet", len); if (!is_destroyed()) [[likely]] { auto& stats_ = impl_->stats_; - STAT_INCREMENT_N(Stats, bytes_received, vec.len); + STAT_INCREMENT_N(Stats, bytes_received, len); // Process deferred operations that couldn't run inside callback // scopes (e.g., HTTP/3 GOAWAY handling that calls into JS). application().PostReceive(); @@ -2245,6 +2264,72 @@ bool Session::Receive(Store&& store, return false; } +void Session::SendBatch(Packet::Ptr* packets, + PathStorage* paths, + size_t count) { + DCHECK(!is_destroyed()); + if (count == 0) return; + + // Separate packets into those going to the primary endpoint and those + // redirected to other endpoints (rare: path validation, preferred address). + // Redirected packets are sent individually via the target endpoint. + static constexpr size_t kMaxBatch = 64; + DCHECK_LE(count, kMaxBatch); + Packet::Ptr primary_packets[kMaxBatch]; + size_t primary_count = 0; + + for (size_t i = 0; i < count; i++) { + if (!packets[i] || !can_send_packets()) { + packets[i].reset(); + continue; + } + + UpdatePath(paths[i]); + + // Check for cross-endpoint redirect. + bool redirected = false; + if (paths[i].path.local.addrlen > 0) { + SocketAddress local_addr(paths[i].path.local.addr); + auto& mgr = BindingData::Get(env()).session_manager(); + Endpoint* target = mgr.FindEndpointForAddress(local_addr); + if (target != nullptr && target != &endpoint()) { + SocketAddress remote_addr(paths[i].path.remote.addr); + packets[i]->Redirect(static_cast(target), + remote_addr); + target->Send(std::move(packets[i])); + redirected = true; + } + } + + if (!redirected) { + primary_packets[primary_count++] = std::move(packets[i]); + } + } + + if (primary_count == 0) return; + + // Use batched send for the primary endpoint. + if (prefer_try_send_) { + endpoint().SendBatch(primary_packets, primary_count); + } else { + // Non-flush path: send individually via async uv_udp_send. + for (size_t i = 0; i < primary_count; i++) { + Send(std::move(primary_packets[i])); + } + } +} + +void Session::FlushPendingData() { + DCHECK(!is_destroyed()); + if (impl_->application_) { + // Prefer synchronous sends during the deferred flush to avoid the + // one-tick latency of async uv_udp_send from the uv_check callback. + prefer_try_send_ = true; + application().SendPendingData(); + prefer_try_send_ = false; + } +} + void Session::Send(Packet::Ptr packet) { // Sending a Packet is generally best effort. If we're not in a state // where we can send a packet, it's ok to drop it on the floor. The @@ -2261,6 +2346,16 @@ void Session::Send(Packet::Ptr packet) { return; } + // When called from the deferred flush path (uv_check callback), + // prefer synchronous send to avoid the one-tick latency of async + // uv_udp_send. SendOrTrySend uses uv_udp_try_send first, falling + // back to uv_udp_send on EAGAIN. + if (prefer_try_send_) { + Debug(this, "Session is sending (try_send) %s", packet->ToString()); + endpoint().SendOrTrySend(std::move(packet)); + return; + } + Debug(this, "Session is sending %s", packet->ToString()); endpoint().Send(std::move(packet)); } @@ -3550,6 +3645,10 @@ void Session::InitPerContext(Realm* realm, Local target) { NODE_DEFINE_CONSTANT(target, QUIC_PROTO_MAX); NODE_DEFINE_CONSTANT(target, QUIC_PROTO_MIN); + static constexpr auto DEFAULT_HANDSHAKE_TIMEOUT = + Session::Options::DEFAULT_HANDSHAKE_TIMEOUT; + NODE_DEFINE_CONSTANT(target, DEFAULT_HANDSHAKE_TIMEOUT); + NODE_DEFINE_STRING_CONSTANT( target, "DEFAULT_CIPHERS", TLSContext::DEFAULT_CIPHERS); NODE_DEFINE_STRING_CONSTANT( diff --git a/src/quic/session.h b/src/quic/session.h index 650e8f79ba1428..472079984f313a 100644 --- a/src/quic/session.h +++ b/src/quic/session.h @@ -153,8 +153,15 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { bool qlog = false; // The amount of time (in milliseconds) that the endpoint will wait for the - // completion of the tls handshake. - uint64_t handshake_timeout = UINT64_MAX; + // completion of the TLS handshake. If the handshake does not complete + // within this time, the session is closed. This prevents a peer from + // holding a session open indefinitely in the handshake state, consuming + // server resources (ngtcp2 connection, TLS state, JS objects) without + // ever completing the connection. The default of 10 seconds is generous + // enough to accommodate slow networks with retransmissions while still + // bounding resource exposure. Set to UINT64_MAX to disable. + static constexpr uint64_t DEFAULT_HANDSHAKE_TIMEOUT = 10'000; + uint64_t handshake_timeout = DEFAULT_HANDSHAKE_TIMEOUT; // The keep-alive timeout in milliseconds. When set to a non-zero value, // ngtcp2 will automatically send PING frames to keep the connection alive @@ -353,9 +360,45 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { bool early = false; }; - bool Receive(Store&& store, + bool Receive(const uint8_t* data, + size_t len, const SocketAddress& local_address, - const SocketAddress& remote_address); + const SocketAddress& remote_address, + const PacketInfo& pkt_info = PacketInfo(), + uint64_t ts = 0); + + // ReadPacket processes a single inbound packet through ngtcp2 without + // triggering SendPendingData. This is the building block for batched + // receive processing: the caller (Endpoint::Receive) accumulates + // dirty sessions and a uv_check callback flushes them after all + // packets in the I/O burst have been read. + // Receive() is kept as a convenience wrapper that calls ReadPacket() + // then triggers SendPendingData (for paths like Connect that need + // immediate response). + // The data pointer is used synchronously — ngtcp2_conn_read_pkt does + // not retain a reference after returning, so the caller's buffer can + // be reused immediately. + // When ts is 0 (the default), uv_hrtime() is called internally. + // The batched receive path caches a timestamp and passes it to all + // ReadPacket() calls in the same I/O burst. + bool ReadPacket(const uint8_t* data, + size_t len, + const SocketAddress& local_address, + const SocketAddress& remote_address, + const PacketInfo& pkt_info = PacketInfo(), + uint64_t ts = 0); + + // Called by BindingData's flush callback to trigger SendPendingData + // on this session. Encapsulates the application() access so that + // bindingdata.cc doesn't need the full Application type definition. + void FlushPendingData(); + + // Send a batch of packets accumulated by SendPendingData. Uses + // Endpoint::SendBatch (uv_udp_try_send2 / sendmmsg) for synchronous + // batched delivery when called from the deferred flush path. + // Handles per-packet path updates and cross-endpoint redirects. + // All Ptr entries are consumed (released or moved) on return. + void SendBatch(Packet::Ptr* packets, PathStorage* paths, size_t count); void Send(Packet::Ptr packet); void Send(Packet::Ptr packet, const PathStorage& path); @@ -572,11 +615,22 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source { bool in_ngtcp2_callback_scope_ = false; bool in_nghttp3_callback_scope_ = false; bool destroy_deferred_ = false; + // Set when this session is in BindingData's pending_flush_sessions_ vector. + // Cleared by the flush callback before calling SendPendingData. + // Provides O(1) dedup so a session receiving multiple packets in one I/O + // burst is only scheduled for flush once. + bool pending_flush_ = false; + // When true, Session::Send prefers synchronous delivery via + // Endpoint::SendOrTrySend (uv_udp_try_send with async fallback). + // Set during FlushPendingData to avoid the one-tick latency of + // async-only sends from the uv_check callback. + bool prefer_try_send_ = false; QuicConnectionPointer connection_; std::unique_ptr tls_session_; friend struct NgTcp2CallbackScope; friend struct NgHttp3CallbackScope; friend class Application; + friend class BindingData; friend class DefaultApplication; friend class Http3ApplicationImpl; friend class Endpoint; diff --git a/test/parallel/test-quic-callback-error-ondatagram-async.mjs b/test/parallel/test-quic-callback-error-ondatagram-async.mjs index 4e6f814906fb40..eebe2e0629522c 100644 --- a/test/parallel/test-quic-callback-error-ondatagram-async.mjs +++ b/test/parallel/test-quic-callback-error-ondatagram-async.mjs @@ -38,9 +38,6 @@ await clientSession.opened; await clientSession.sendDatagram(new Uint8Array([1, 2, 3])); await serverDone.promise; -// The server session was destroyed abruptly (no CONNECTION_CLOSE sent). -// The client may receive a stateless reset if it sends any packet -// before its idle timeout fires, so closed may reject. -await assert.rejects(clientSession.closed, { code: 'ERR_QUIC_TRANSPORT_ERROR' }); +await clientSession.closed; serverEndpoint.close(); await serverEndpoint.closed; diff --git a/test/parallel/test-quic-callback-error-ondatagram.mjs b/test/parallel/test-quic-callback-error-ondatagram.mjs index f0253f22768380..69d1440ed49da6 100644 --- a/test/parallel/test-quic-callback-error-ondatagram.mjs +++ b/test/parallel/test-quic-callback-error-ondatagram.mjs @@ -41,8 +41,5 @@ await clientSession.opened; await clientSession.sendDatagram(new Uint8Array([1, 2, 3])); await serverDone.promise; -// The server session was destroyed abruptly (no CONNECTION_CLOSE sent). -// The client may receive a stateless reset if it sends any packet -// before its idle timeout fires, so closed may reject. -await rejects(clientSession.closed, { code: 'ERR_QUIC_TRANSPORT_ERROR' }); +await clientSession.closed; await serverEndpoint.close(); diff --git a/test/parallel/test-quic-connection-limits.mjs b/test/parallel/test-quic-connection-limits.mjs index acb0f8065d4c78..2f41c388805dc4 100644 --- a/test/parallel/test-quic-connection-limits.mjs +++ b/test/parallel/test-quic-connection-limits.mjs @@ -27,9 +27,11 @@ const endpoint = new QuicEndpoint({ maxConnectionsTotal: 1 }); // Verify the limits are readable and mutable. strictEqual(endpoint.maxConnectionsTotal, 1); -strictEqual(endpoint.maxConnectionsPerHost, 0); -endpoint.maxConnectionsPerHost = 100; +// The default maxConnectionsPerHost is 100 — a non-zero default that +// prevents a single host from exhausting server resources. strictEqual(endpoint.maxConnectionsPerHost, 100); +endpoint.maxConnectionsPerHost = 50; +strictEqual(endpoint.maxConnectionsPerHost, 50); endpoint.maxConnectionsPerHost = 0; let sessionCount = 0; diff --git a/test/parallel/test-quic-h3-handshake-failure.mjs b/test/parallel/test-quic-h3-handshake-failure.mjs new file mode 100644 index 00000000000000..128acab8fffe3d --- /dev/null +++ b/test/parallel/test-quic-h3-handshake-failure.mjs @@ -0,0 +1,56 @@ +// Flags: --experimental-quic --no-warnings + +// Regression test: HTTP/3 server must not crash when a session is closed +// before the H3 application is fully started (control streams bound). +// Previously, closing such a session would call nghttp3_conn_shutdown on +// an H3 connection whose control streams were never bound, causing an +// assertion failure in nghttp3 (conn->tx.ctrl != NULL). +// +// The test creates an H3 server and a client that immediately closes the +// session before the handshake completes. The server creates the H3 +// application during ALPN negotiation, but Start() (which binds control +// streams) hasn't been called yet when the session is torn down. +// The server must handle this gracefully without crashing. + +import { hasQuic, skip, mustNotCall } from '../common/index.mjs'; +import { setTimeout } from 'node:timers/promises'; +import * as fixtures from '../common/fixtures.mjs'; + +const { readKey } = fixtures; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +const { listen, connect } = await import('node:quic'); +const { createPrivateKey } = await import('node:crypto'); + +const key = createPrivateKey(readKey('agent1-key.pem')); +const cert = readKey('agent1-cert.pem'); + +const serverEndpoint = await listen(async (serverSession) => { + await serverSession.closed; +}, { + sni: { '*': { keys: [key], certs: [cert] } }, + onheaders: mustNotCall(), +}); + +// Connect then immediately close the session before the handshake completes. +// This exercises the H3 shutdown path on the server while the H3 application +// exists but hasn't started (control streams not yet bound). +const clientSession = await connect(serverEndpoint.address, { + servername: 'localhost', + // h3 ALPN — must match the server so the H3 application is selected + // on the server side before we tear it down. +}); + +// Close immediately — don't wait for handshake. +await clientSession.close(); + +// Give the server time to process the close and tear down the session. +await setTimeout(500); + +// The critical assertion: reaching this point without a crash means the +// server correctly handled the H3 shutdown before control streams were +// bound. Verify the endpoint is still alive by closing it gracefully. +await serverEndpoint.close(); diff --git a/test/parallel/test-quic-internal-endpoint-stats-state.mjs b/test/parallel/test-quic-internal-endpoint-stats-state.mjs index 015155344fde42..57044a773eb2d6 100644 --- a/test/parallel/test-quic-internal-endpoint-stats-state.mjs +++ b/test/parallel/test-quic-internal-endpoint-stats-state.mjs @@ -43,8 +43,8 @@ const { isListening: false, isClosing: false, isBusy: false, - maxConnectionsPerHost: 0, - maxConnectionsTotal: 0, + maxConnectionsPerHost: 100, + maxConnectionsTotal: 10_000, pendingCallbacks: '0', });