Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/45c6fa93-45cc-402a-869c-41dbcec2c799.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.RuntimeSupport",
"Type": "Patch",
"ChangelogMessages": [
"Fix thread pool starvation under multi-concurrency"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ internal class Constants
// used if AWS_LAMBDA_MAX_CONCURRENCY environment variable is set.
internal const string ENVIRONMENT_VARIABLE_AWS_LAMBDA_DOTNET_PROCESSING_TASKS = "AWS_LAMBDA_DOTNET_PROCESSING_TASKS";

// .NET Lambda runtime specific environment variable used to override the minimum number of ThreadPool
// worker threads. When set, this value is used instead of the default (2 * processorCount).
// This allows customers to tune thread pool sizing for their specific workload.
internal const string ENVIRONMENT_VARIABLE_AWS_LAMBDA_DOTNET_MIN_THREADS = "AWS_LAMBDA_DOTNET_MIN_THREADS";

internal const string ENVIRONMENT_VARIABLE_DISABLE_HEAP_MEMORY_LIMIT = "AWS_LAMBDA_DOTNET_DISABLE_MEMORY_LIMIT_CHECK";

internal const string ENVIRONMENT_VARIABLE_AWS_LAMBDA_DOTNET_PREJIT = "AWS_LAMBDA_DOTNET_PREJIT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ internal LambdaBootstrap(HttpClient httpClient, LambdaBootstrapHandler handler,
public async Task RunAsync(CancellationToken cancellationToken = default(CancellationToken))
{
AdjustMemorySettings();
AdjustThreadPoolSettings();

if (_configuration.IsCallPreJit)
{
Expand Down Expand Up @@ -629,6 +630,61 @@ private void AdjustMemorySettings()
}

#region IDisposable Support

/// <summary>
/// When running in multi-concurrency mode, pre-size the .NET ThreadPool to ensure there are enough
/// threads available for both handler execution and polling task continuations. Without this,
/// blocking handlers (Thread.Sleep, .Result, .Wait()) can exhaust the ThreadPool, preventing
/// polling tasks from cycling back to /next and causing Runtime.Unavailable errors from RAPID.
///
/// The default minimum is 2 * processorCount. Customers can override this via the
/// AWS_LAMBDA_DOTNET_MIN_THREADS environment variable.
/// </summary>
private void AdjustThreadPoolSettings()
{
try
{
var maxConcurrency = Utils.GetMaxConcurrency(_environmentVariables);
if (maxConcurrency <= 0)
return;

// Check for customer override via environment variable
int desiredMinThreads;
var overrideValue = _environmentVariables.GetEnvironmentVariable(Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_DOTNET_MIN_THREADS);
if (!string.IsNullOrEmpty(overrideValue) && int.TryParse(overrideValue, out var parsedOverride) && parsedOverride > 0)
{
desiredMinThreads = parsedOverride;
}
else
{
// Default: modest bump to ensure polling task continuations have threads
// available without pre-creating too many threads.
desiredMinThreads = 2 * Environment.ProcessorCount;
}

ThreadPool.GetMinThreads(out int currentMinWorker, out int currentMinIO);

// Only increase, never decrease — respect any higher value already set
// (e.g., by the customer in their code).
if (currentMinWorker >= desiredMinThreads)
return;

var success = ThreadPool.SetMinThreads(desiredMinThreads, currentMinIO);
if (success)
{
_logger.LogInformation($"Adjusted ThreadPool minimum worker threads from {currentMinWorker} to {desiredMinThreads} for multi-concurrency mode (max concurrency: {maxConcurrency}).");
}
else
{
_logger.LogError(null, $"Failed to set ThreadPool minimum worker threads to {desiredMinThreads}.");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to adjust ThreadPool settings for multi-concurrency mode.");
}
}

private bool disposedValue = false; // To detect redundant calls

/// <summary>
Expand Down
22 changes: 21 additions & 1 deletion Libraries/src/Amazon.Lambda.RuntimeSupport/Helpers/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,33 @@ internal static int DetermineProcessingTaskCount(IEnvironmentVariables environme
}
else
{
processingTaskCount = Math.Max(2, processorCount);
// Use the max concurrency value as the default polling task count so there are
// enough polling tasks to fill all available concurrency slots. Fall back to
// the processor-based heuristic if the value cannot be parsed.
var maxConcurrency = GetMaxConcurrency(environmentVariables);
processingTaskCount = maxConcurrency > 0
? maxConcurrency
: Math.Max(2, processorCount);
}
}

return processingTaskCount;
}

/// <summary>
/// Parses the AWS_LAMBDA_MAX_CONCURRENCY environment variable as an integer.
/// Returns the parsed value if valid and greater than 0, otherwise returns 0.
/// </summary>
internal static int GetMaxConcurrency(IEnvironmentVariables environmentVariables)
{
var value = environmentVariables.GetEnvironmentVariable(Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_MAX_CONCURRENCY);
if (!string.IsNullOrEmpty(value) && int.TryParse(value, out var maxConcurrency) && maxConcurrency > 0)
{
return maxConcurrency;
}
return 0;
}

/// <summary>
/// Create an Action callback that can be used for setting the trace id on the AWS SDK for .NET if the SDK is present.
/// If the AWS .NET SDK is not found then null is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,93 @@
Assert.Equal("end-request1,traceId-trace1", handlerEvents[3]);
}

/// <summary>
/// Bug condition exploration test: Demonstrates thread pool starvation when MC is enabled
/// with blocking handlers. This test encodes the EXPECTED behavior after the fix is applied.
/// On unfixed code, this test is EXPECTED TO FAIL with a timeout because:
/// - Only 2 polling tasks are created (Math.Max(2, processorCount))
/// - ThreadPool.MinThreads is constrained to 2 worker threads
/// - 10 blocking handlers (Thread.Sleep) exhaust the thread pool
/// - Polling task continuations cannot resume to call GetNextInvocationAsync
/// - Not all 10 invocations get dequeued within the timeout
///
/// Validates: Requirements 1.1, 1.2, 1.3, 1.4
/// </summary>
[Fact]
public async Task ThreadPoolStarvation_BlockingHandlers_AllInvocationsDequeued()
{
// Save original ThreadPool settings to restore after test
ThreadPool.GetMinThreads(out int originalMinWorker, out int originalMinIO);

try
{
// Constrain ThreadPool to simulate Lambda's default environment (low thread count)
ThreadPool.SetMinThreads(2, 2);

TestEnvironmentVariables environmentVariables = new TestEnvironmentVariables();
environmentVariables.SetEnvironmentVariable(
Amazon.Lambda.RuntimeSupport.Bootstrap.Constants.ENVIRONMENT_VARIABLE_AWS_LAMBDA_MAX_CONCURRENCY, "10");

// Create 10 invocation events with blocking handlers
var invocationEvents = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent[10];
for (int i = 0; i < 10; i++)
{
invocationEvents[i] = new TestMultiConcurrencyRuntimeApiClient.InvocationEvent
{
Headers = CreateDefaultHeaders($"request{i}", $"trace{i}"),
FunctionInput = CreateFunctionInput(new SleepTimeEvent(3000, 0))
};
}

var testRuntimeApiClient = new TestMultiConcurrencyRuntimeApiClient(environmentVariables, invocationEvents);

// Use a thread-safe counter to track dequeued invocations
int dequeuedCount = 0;

Check warning on line 156 in Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyTests.cs

View workflow job for this annotation

GitHub Actions / Release PR

The variable 'dequeuedCount' is assigned but its value is never used

Check warning on line 156 in Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/LambdaBootstrapMultiConcurrencyTests.cs

View workflow job for this annotation

GitHub Actions / Release PR

The variable 'dequeuedCount' is assigned but its value is never used
var allDequeuedEvent = new ManualResetEventSlim(false);

// Wrap the test client to track dequeue operations in a thread-safe manner
var originalGetNext = testRuntimeApiClient;

// Handler that performs blocking work (Thread.Sleep) to exhaust the thread pool
var handler = HandlerWrapper.GetHandlerWrapper((SleepTimeEvent sleepTime, ILambdaContext context) =>
{
// Blocking sleep to simulate CPU-bound or synchronous I/O work
Thread.Sleep(sleepTime.StartSleep);
}, _serializer).Handler;

var lambdaBootstrap = new LambdaBootstrap(
httpClient: null,
handler: handler,
initializer: null,
ownsHttpClient: true,
environmentVariables: environmentVariables);
lambdaBootstrap.Client = testRuntimeApiClient;

// Run with a 10-second timeout - if all 10 invocations are dequeued, the test passes
CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10));

try
{
await lambdaBootstrap.RunAsync(cts.Token);
}
catch (OperationCanceledException)
{
// Expected when the cancellation token is triggered (timeout)
}

// Assert that all 10 invocations were dequeued from the test client within the timeout.
// On unfixed code, this will fail because thread pool starvation prevents polling tasks
// from cycling back to GetNextInvocationAsync.
Assert.Equal(10, testRuntimeApiClient.ProcessInvocationEvents.Count);
}
finally
{
// Restore original ThreadPool settings
ThreadPool.SetMinThreads(originalMinWorker, originalMinIO);
}
}

private Dictionary<string, IEnumerable<string>> CreateDefaultHeaders(string requestId, string traceId)
{
return new Dictionary<string, IEnumerable<string>>
Expand Down
Loading
Loading