[TransferEngine] Add congestion control plugin interface and AIMD implementation#2489
[TransferEngine] Add congestion control plugin interface and AIMD implementation#2489stmatengss wants to merge 1 commit into
Conversation
…lementation Add a pluggable congestion control framework to the TENT runtime, enabling per-device flow control and congestion awareness for multi-NIC deployments. Plugin Interface (tent/include/tent/runtime/congestion_control.h): - CongestionControlPlugin abstract base class with 3 hook points: * onAdmit() — admission control before scheduling transfers * onCompletion() — feedback on successful transfer completion * onCongestionSignal() — reaction to timeout/ECN signals - Context structs: AdmitContext, AdmitDecision, CompletionEvent, CongestionSignal - NoOp default implementation (zero-overhead when no policy is active) AIMD Plugin (tent/include/tent/runtime/aimd_congestion_control.h): - Additive Increase / Multiplicative Decrease per-device window management - Lock-free atomic state for hot-path performance (<100ns) - Configurable parameters: initial/cap/floor window, AI increment, MD factor Integration Points: - TransferEngineImpl: admission hook in submitTransfer(), plugin lifecycle - DeviceSelector / quota: completion callback in release() - Workers: congestion signal emission on CQ timeout Tests: - 13 GTest unit tests for AIMD plugin covering window growth, shrink, bounds, multi-device isolation Documentation: - Detailed design doc (Chinese): docs/source/design/tent/congestion-control.md - Brief presentation doc: docs/source/design/tent/congestion-control-brief.md Motivation: Addresses deployment risks for HPN 7.0UL network architecture with L20D nodes: - Risk 3.3: M2N ASW buffer exhaustion — solved by per-destination inflight tracking - Risk 3.4: HPN 7.0UL uplink congestion — solved by topology-aware rate limiting
There was a problem hiding this comment.
Code Review
This pull request introduces a pluggable Congestion Control Plugin framework for the TENT Runtime, featuring an AIMD-based implementation and corresponding unit tests to mitigate network buffer exhaustion and uplink congestion. Feedback on the changes highlights critical issues: non-atomic read-modify-write operations on congestion windows that cause race conditions, a lack of propagation of the plugin to active transports and device selectors, an ineffective admission check that ignores deferral decisions and passes an invalid device ID, and hardcoded status and priority values in completion events.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| auto& w = window_bytes_[event.device_id]; | ||
| uint64_t cur = w.load(std::memory_order_relaxed); | ||
| uint64_t next = std::min(cur + config_.ai_increment_bytes, | ||
| config_.window_cap_bytes); | ||
| w.store(next, std::memory_order_relaxed); |
There was a problem hiding this comment.
The read-modify-write operation on window_bytes_ is not atomic. Since onCompletion can be called concurrently from multiple worker threads when transfers complete, this relaxed load followed by a relaxed store creates a race condition where window updates can be lost. Use compare_exchange_weak to perform the update atomically.
auto& w = window_bytes_[event.device_id];
uint64_t cur = w.load(std::memory_order_relaxed);
while (!w.compare_exchange_weak(
cur,
std::min(cur + config_.ai_increment_bytes, config_.window_cap_bytes),
std::memory_order_relaxed)) {
}| auto& w = window_bytes_[signal.device_id]; | ||
| uint64_t cur = w.load(std::memory_order_relaxed); | ||
| double factor = 1.0 - config_.md_factor * signal.severity; | ||
| uint64_t reduced = static_cast<uint64_t>(cur * std::max(factor, 0.0)); | ||
| uint64_t clamped = std::max(reduced, config_.window_floor_bytes); | ||
| w.store(clamped, std::memory_order_relaxed); |
There was a problem hiding this comment.
Similar to onCompletion, the read-modify-write operation on window_bytes_ in onCongestionSignal is not atomic and can suffer from lost updates under concurrent congestion signals. Use compare_exchange_weak to ensure the multiplicative decrease is applied atomically.
auto& w = window_bytes_[signal.device_id];
uint64_t cur = w.load(std::memory_order_relaxed);
double factor = 1.0 - config_.md_factor * signal.severity;
uint64_t next;
do {
uint64_t reduced = static_cast<uint64_t>(cur * std::max(factor, 0.0));
next = std::max(reduced, config_.window_floor_bytes);
} while (!w.compare_exchange_weak(cur, next, std::memory_order_relaxed));| cc_plugin_ = std::move(plugin); | ||
| LOG(INFO) << "Congestion control plugin set: " << cc_plugin_->getName(); |
There was a problem hiding this comment.
The congestion control plugin is set on TransferEngineImpl but is never propagated to the loaded transports or their DeviceSelectors. Because of this, DeviceSelector::cc_plugin_ remains nullptr, and no completion events or congestion signals will ever be delivered to the plugin. You need to propagate the plugin to the active transports so they can register it on their selectors.
| // Congestion control admission check | ||
| { | ||
| AdmitContext admit_ctx{ | ||
| merged_request, | ||
| 0, // TODO: track global_inflight_bytes | ||
| 0, // TODO: track per-device inflight from DeviceSelector | ||
| -1, // device_id not yet determined at this stage | ||
| merged_request.priority}; | ||
| auto decision = cc_plugin_->onAdmit(admit_ctx); | ||
| if (decision.device_mask_override != 0) { | ||
| // Apply congestion control device mask constraint | ||
| task.device_mask &= decision.device_mask_override; | ||
| } | ||
| } |
There was a problem hiding this comment.
There are two major issues with the admission control check here:
decision.admitis completely ignored. If the plugin decides to defer or reject the request (admit = false), the engine still proceeds with the transfer anyway.device_idis passed as-1because the device is not yet determined at this stage. However, the AIMD plugin (AimdCongestionControlPlugin::onAdmit) always returnstrue(admit) whendevice_id < 0. This means the AIMD congestion control is completely bypassed and ineffective.
To fix this, you need to integrate a deferral queue or callback mechanism to handle admit = false and ensure the candidate device is resolved before calling onAdmit.
| if (cc_plugin_) { | ||
| CompletionEvent event; | ||
| event.device_id = dev_id; | ||
| event.bytes = length; | ||
| event.latency_us = latency; | ||
| event.status = TransferStatusEnum::COMPLETED; | ||
| event.priority = PRIO_HIGH; // Default; actual priority not available here | ||
| cc_plugin_->onCompletion(event); | ||
| } |
There was a problem hiding this comment.
The CompletionEvent is constructed with hardcoded status = TransferStatusEnum::COMPLETED and priority = PRIO_HIGH. This prevents the congestion control plugin from distinguishing between different request priorities or reacting to failed transfers. Consider refactoring DeviceSelector::release to accept the actual status and priority from the transport layer.
|
@qoder review it |
|
Thanks for the congestion-control PR ping! I’ve gone through the design docs, AIMD plugin implementation, integration points, and tests. Here’s my review summary: Overall
AIMD implementation (
Plugin wiring / lifecycle
Admission path (
Completion and congestion signals
Docs & tests
Suggested next steps
Once those integration and concurrency issues are addressed, the plugin framework should be in good shape to control congestion in the intended M2N/HPN scenarios. 🚀 🤖 Generated by Qoder • View workflow run |
|
Do congestion control and QoS conflict with each other? Or can they complement each other? cc @alogfans |
They're complementary and operate at different layers — congestion control and QoS answer different questions:
So the natural composition is: QoS decides what gets dispatched and in what order; CC decides how fast each dispatched flow drives a given device. They don't fight as long as the layering is kept clean:
A useful framing: CC handles the network layer (rate vs. fabric capacity), QoS handles the application/SLO layer (which flows matter, in what order). They're orthogonal axes, and you generally want both. This is also why a deadline/SLO-aware admission policy (RFC #2519) sits cleanly on top — it consumes a per-transfer deadline to decide admission, while AIMD independently governs send rate; neither needs to know the other's internal state, they just share the admission boundary. Happy to help untangle the integration points if useful. |
They are not conflicted but should be considered together, and uses the uniform mechanism. |
Summary
Add a pluggable congestion control framework to the TENT runtime, enabling per-device flow control and congestion awareness for multi-NIC deployments.
Changes
Plugin Interface (
tent/include/tent/runtime/congestion_control.h)CongestionControlPluginabstract base class with 3 hook points:onAdmit()— admission control before scheduling transfersonCompletion()— feedback on successful transfer completiononCongestionSignal()— reaction to timeout/ECN signalsAdmitContext,AdmitDecision,CompletionEvent,CongestionSignalAIMD Plugin (
tent/include/tent/runtime/aimd_congestion_control.h)Integration Points
TransferEngineImpl: admission hook insubmitTransfer(), plugin lifecycleDeviceSelector: completion callback inrelease()Workers: congestion signal emission on CQ timeoutTests
Documentation
docs/source/design/tent/congestion-control.mddocs/source/design/tent/congestion-control-brief.mdMotivation
Addresses deployment risks for HPN 7.0UL network architecture with L20D nodes:
Testing