Skip to content
Open
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
/tags
/tags.*
/doc/api.xml
/docs/
/node
/node_g
/gon-config.json
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/quic/quic.js
Original file line number Diff line number Diff line change
Expand Up @@ -3942,8 +3942,8 @@ class QuicEndpoint {
const {
retryTokenExpiration,
tokenExpiration,
maxConnectionsPerHost = 0,
maxConnectionsTotal = 0,
maxConnectionsPerHost = 100,
maxConnectionsTotal = 10_000,
maxStatelessResetsPerHost,
disableStatelessReset,
addressLRUSize,
Expand Down
87 changes: 68 additions & 19 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ void Session::Application::ReceiveStreamReset(Stream* stream,
// < 0 (other): fatal error, session already closed
ssize_t Session::Application::TryWritePendingDatagram(PathStorage* path,
uint8_t* dest,
size_t destlen) {
size_t destlen,
uint64_t ts) {
CHECK(session_->HasPendingDatagrams());
auto max_attempts = session_->config().options.max_datagram_send_attempts;

Expand All @@ -262,17 +263,20 @@ ssize_t Session::Application::TryWritePendingDatagram(PathStorage* path,
int accepted = 0;
int dg_flags = NGTCP2_WRITE_DATAGRAM_FLAG_MORE;

// PacketInfo for the datagram path. When libuv gains per-socket ECN
// marking, the value from ngtcp2 should be forwarded to the send path.
PacketInfo dg_pi;
ssize_t dg_nwrite = ngtcp2_conn_writev_datagram(*session_,
&path->path,
nullptr,
dg_pi,
dest,
destlen,
&accepted,
dg_flags,
dg.id,
&dgvec,
1,
uv_hrtime());
ts);

if (accepted) {
// Nice, the datagram was accepted!
Expand Down Expand Up @@ -329,14 +333,42 @@ void Session::Application::SendPendingData() {
if (!session().can_send_packets()) [[unlikely]] {
return;
}
static constexpr size_t kMaxPackets = 32;
// Upper bound on packets per SendPendingData call. ngtcp2's send quantum
// is typically 64 KB, which at 1200-byte minimum packet size is ~53
// packets. 64 covers the worst case with headroom. The actual count per
// call is dynamically capped by ngtcp2_conn_get_send_quantum().
static constexpr size_t kMaxPackets = 64;
Debug(session_, "Application sending pending data");
// Cache the timestamp once for the entire send loop. ngtcp2 does not
// require nanosecond-accurate monotonicity within a single burst —
// a single timestamp per SendPendingData call is what other QUIC
// implementations use (e.g., quiche, msquic). When kernel-level
// packet pacing becomes available via libuv, this timestamp becomes
// the base for computing per-packet transmit timestamps.
const uint64_t ts = uv_hrtime();
PathStorage path;
StreamData stream_data;

bool closed = false;

// Batch accumulation: packets are collected here and flushed via
// Session::SendBatch when the loop exits, the batch is full, or
// on early return. This enables synchronous batched delivery via
// uv_udp_try_send2 (sendmmsg) from the deferred flush path.
Packet::Ptr batch[kMaxPackets];
PathStorage batch_paths[kMaxPackets];
size_t batch_count = 0;

auto flush_batch = [&] {
if (batch_count == 0) return;
session_->SendBatch(batch, batch_paths, batch_count);
batch_count = 0;
};

auto update_stats = OnScopeLeave([&] {
if (closed) return;
// Flush any remaining accumulated packets before updating stats.
flush_batch();
auto& s = session();
if (!s.is_destroyed()) [[likely]] {
s.UpdatePacketTxTime();
Expand All @@ -353,7 +385,7 @@ void Session::Application::SendPendingData() {
kMaxPackets, ngtcp2_conn_get_send_quantum(*session_) / max_packet_size);
if (max_packet_count == 0) return;

// The number of packets that have been sent in this call to SendPendingData.
// The number of packets that have been prepared in this call.
size_t packet_send_count = 0;

Packet::Ptr packet;
Expand All @@ -368,6 +400,16 @@ void Session::Application::SendPendingData() {
return true;
};

// Accumulate a completed packet into the batch.
auto enqueue_packet =
[&](Packet::Ptr& pkt, size_t len, const PacketInfo& pi) {
Debug(session_, "Enqueuing packet with %zu bytes into batch", len);
pkt->Truncate(len);
pkt->set_pkt_info(pi);
path.CopyTo(&batch_paths[batch_count]);
batch[batch_count++] = std::move(pkt);
};

// We're going to enter a loop here to prepare and send no more than
// max_packet_count packets.
for (;;) {
Expand Down Expand Up @@ -405,8 +447,14 @@ void Session::Application::SendPendingData() {
}

// Awesome, let's write our packet!
ssize_t nwrite = WriteVStream(
&path, packet->data(), &ndatalen, packet->length(), stream_data);
PacketInfo pi;
ssize_t nwrite = WriteVStream(&path,
&pi,
packet->data(),
&ndatalen,
packet->length(),
stream_data,
ts);

// When ndatalen is > 0, that's our indication that stream data was accepted
// in to the packet. Yay!
Expand Down Expand Up @@ -493,7 +541,7 @@ void Session::Application::SendPendingData() {
// if there is one. Otherwise just loop around and keep going.
if (session_->HasPendingDatagrams()) {
auto result = TryWritePendingDatagram(
&path, packet->data(), packet->length());
&path, packet->data(), packet->length(), ts);
// When result is 0, either the datagram was congestion controlled,
// didn't fit in the packet, or was abandoned. Skip and continue.

Expand All @@ -502,8 +550,7 @@ void Session::Application::SendPendingData() {
if (result > 0) {
size_t len = result;
Debug(session_, "Sending packet with %zu bytes", len);
packet->Truncate(len);
session_->Send(std::move(packet), path);
enqueue_packet(packet, len, pi);
if (++packet_send_count == max_packet_count) return;
} else if (result < 0) {
// Any negative result other than NGTCP2_ERR_WRITE_MORE
Expand Down Expand Up @@ -540,8 +587,7 @@ void Session::Application::SendPendingData() {
// is the size of the packet we are sending.
size_t len = nwrite;
Debug(session_, "Sending packet with %zu bytes", len);
packet->Truncate(len);
session_->Send(std::move(packet), path);
enqueue_packet(packet, len, pi);
if (++packet_send_count == max_packet_count) return;

// If there are pending datagrams, try sending them in a fresh packet.
Expand All @@ -557,11 +603,10 @@ void Session::Application::SendPendingData() {
return session_->Close(CloseMethod::SILENT);
}
auto result =
TryWritePendingDatagram(&path, packet->data(), packet->length());
TryWritePendingDatagram(&path, packet->data(), packet->length(), ts);
if (result > 0) {
Debug(session_, "Sending datagram packet with %zd bytes", result);
packet->Truncate(static_cast<size_t>(result));
session_->Send(std::move(packet), path);
enqueue_packet(packet, static_cast<size_t>(result), PacketInfo());
if (++packet_send_count == max_packet_count) return;
} else if (result < 0 && result != NGTCP2_ERR_WRITE_MORE) {
// Fatal error — session already closed by TryWritePendingDatagram.
Expand All @@ -574,25 +619,29 @@ void Session::Application::SendPendingData() {
}

ssize_t Session::Application::WriteVStream(PathStorage* path,
PacketInfo* pi,
uint8_t* dest,
ssize_t* ndatalen,
size_t max_packet_size,
const StreamData& stream_data) {
const StreamData& stream_data,
uint64_t ts) {
DCHECK_LE(stream_data.count, kMaxVectorCount);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
// The PacketInfo out-param is populated by ngtcp2 with the ECN codepoint
// to apply when sending this packet. When libuv gains per-socket ECN
// marking, the value should be forwarded to the send path.
return ngtcp2_conn_writev_stream(*session_,
&path->path,
// TODO(@jasnell): ECN blocked on libuv
nullptr,
*pi,
dest,
max_packet_size,
ndatalen,
flags,
stream_data.id,
stream_data,
stream_data.count,
uv_hrtime());
ts);
}

// ============================================================================
Expand Down
11 changes: 8 additions & 3 deletions src/quic/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,19 @@ class Session::Application : public MemoryRetainer {
// the datagram is either congestion limited or was abandoned
ssize_t TryWritePendingDatagram(PathStorage* path,
uint8_t* dest,
size_t destlen);
size_t destlen,
uint64_t ts);

// Write the given stream_data into the buffer.
// Write the given stream_data into the buffer. The PacketInfo out-param
// is populated by ngtcp2 with per-packet metadata (e.g., ECN codepoint)
// that should be applied when sending the packet.
ssize_t WriteVStream(PathStorage* path,
PacketInfo* pi,
uint8_t* buf,
ssize_t* ndatalen,
size_t max_packet_size,
const StreamData& stream_data);
const StreamData& stream_data,
uint64_t ts);

Session* session_ = nullptr;
};
Expand Down
55 changes: 55 additions & 0 deletions src/quic/bindingdata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace node {
using mem::kReserveSizeAndAlign;
using v8::Function;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Local;
using v8::Object;
using v8::String;
Expand Down Expand Up @@ -154,6 +155,16 @@ BindingData& BindingData::Get(Environment* env) {

BindingData::~BindingData() {
quic_alloc_state.binding = nullptr;
if (flush_check_initialized_) {
uv_check_stop(&flush_check_);
flush_check_started_ = false;
// The check handle is closed inline here. Because BindingData destruction
// happens during Environment cleanup, the handle will be finalized by
// libuv's close phase.
uv_close(reinterpret_cast<uv_handle_t*>(&flush_check_), nullptr);
flush_check_initialized_ = false;
}
pending_flush_sessions_.clear();
}

ngtcp2_mem* BindingData::ngtcp2_allocator() {
Expand Down Expand Up @@ -221,6 +232,11 @@ void BindingData::RegisterExternalReferences(
BindingData::BindingData(Realm* realm, Local<Object> object)
: BaseObject(realm, object) {
MakeWeak();
CHECK_EQ(uv_check_init(env()->event_loop(), &flush_check_), 0);
flush_check_.data = this;
// Unref so the check handle doesn't keep the event loop alive on its own.
uv_unref(reinterpret_cast<uv_handle_t*>(&flush_check_));
flush_check_initialized_ = true;
}

SessionManager& BindingData::session_manager() {
Expand All @@ -230,6 +246,45 @@ SessionManager& BindingData::session_manager() {
return *session_manager_;
}

void BindingData::ScheduleSessionFlush(const BaseObjectPtr<Session>& session) {
pending_flush_sessions_.push_back(session);
if (!flush_check_started_) {
uv_check_start(&flush_check_, OnFlushCheck);
flush_check_started_ = true;
}
}

void BindingData::OnFlushCheck(uv_check_t* handle) {
auto* binding = static_cast<BindingData*>(handle->data);
if (binding->pending_flush_sessions_.empty()) {
uv_check_stop(&binding->flush_check_);
binding->flush_check_started_ = false;
return;
}

HandleScope scope(binding->env()->isolate());

// Swap to a local vector before iterating. SendPendingData may trigger
// MakeCallback which runs JS that could cause more packet receives via
// re-entry (e.g., a stream data callback that synchronously writes to
// another session). Any sessions added during the flush remain in
// pending_flush_sessions_ and are picked up on the next check tick.
auto sessions = std::move(binding->pending_flush_sessions_);
for (auto& session : sessions) {
session->pending_flush_ = false;
if (!session->is_destroyed()) {
session->FlushPendingData();
}
}

// If no new sessions were added during the flush, stop the check
// to avoid per-tick callback overhead when idle.
if (binding->pending_flush_sessions_.empty()) {
uv_check_stop(&binding->flush_check_);
binding->flush_check_started_ = false;
}
}

void BindingData::MemoryInfo(MemoryTracker* tracker) const {
#define V(name, _) tracker->TrackField(#name, name##_callback());

Expand Down
20 changes: 20 additions & 0 deletions src/quic/bindingdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include <ngtcp2/ngtcp2_crypto.h>
#include <node.h>
#include <node_mem.h>
#include <uv.h>
#include <v8.h>
#include <memory>
#include <unordered_map>
#include <vector>
#include "defs.h"

namespace node::quic {
Expand Down Expand Up @@ -201,6 +203,13 @@ class BindingData final
// routing so that any endpoint can route packets to any session.
SessionManager& session_manager();

// Schedule a session for deferred SendPendingData. Sessions are accumulated
// during the I/O poll phase (via Endpoint::Receive -> Session::ReadPacket)
// and flushed in a uv_check callback immediately after poll completes.
// This batches multiple received packets before generating responses,
// allowing ngtcp2 to make better ACK coalescing decisions.
void ScheduleSessionFlush(const BaseObjectPtr<Session>& session);

std::unordered_map<Endpoint*, BaseObjectPtr<BaseObject>> listening_endpoints;

size_t current_ngtcp2_memory_ = 0;
Expand Down Expand Up @@ -247,6 +256,17 @@ class BindingData final
#undef V

std::unique_ptr<SessionManager> session_manager_;

// Deferred send flush state. The uv_check_t fires immediately after
// the I/O poll phase in the same event loop tick, allowing batched
// receive processing: all packets are read during poll, then
// SendPendingData is called once per dirty session in the check callback.
uv_check_t flush_check_;
std::vector<BaseObjectPtr<Session>> pending_flush_sessions_;
bool flush_check_started_ = false;
bool flush_check_initialized_ = false;

static void OnFlushCheck(uv_check_t* handle);
};

JS_METHOD_IMPL(IllegalConstructor);
Expand Down
Loading
Loading