From 0c27988962f95cefbb2d3e544fa74cd63c18a7ff Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Tue, 12 May 2026 17:53:28 +0000 Subject: [PATCH 01/10] fix: add retry logic for tunnel reconnection in jmp shell proxy When the tunnel between the local proxy and the jumpstarter-router drops during a jmp shell session, subsequent j commands would time out with SETTINGS frame timeout errors because: 1. The Dial call to the controller could fail with transient UNAVAILABLE errors, but there was no retry logic for these (only FAILED_PRECONDITION was retried). 2. The connect_router_stream could hang indefinitely trying to establish an HTTP/2 connection to an unreachable router endpoint, with no timeout on the channel readiness check. This commit fixes both issues: - Adds retry with exponential backoff for transient gRPC errors (UNAVAILABLE, RESOURCE_EXHAUSTED, ABORTED, INTERNAL) in the Dial call within handle_async. - Adds retry with exponential backoff for the router connection establishment, including re-dialing to get fresh router tokens when retrying. - Adds a channel_ready() timeout (10s) in connect_router_stream so that connections to unreachable routers fail fast instead of hanging on the HTTP/2 SETTINGS frame exchange. Fixes #638 Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter/jumpstarter/client/lease.py | 99 +++++++++++++++---- .../jumpstarter/jumpstarter/common/streams.py | 17 +++- 2 files changed, 98 insertions(+), 18 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index 6c9a86391..75b6cc0a4 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -312,14 +312,25 @@ def __contextmanager__(self) -> Generator[Self]: with self.portal.wrap_async_context_manager(self) as value: yield value - async def handle_async(self, stream): + # gRPC status codes that indicate transient network failures worth retrying + _TRANSIENT_GRPC_CODES = frozenset({ + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.ABORTED, + grpc.StatusCode.INTERNAL, + }) + + async def handle_async(self, stream): # noqa: C901 logger.debug("Connecting to Lease with name %s", self.name) - # Retry Dial with exponential backoff for transient "exporter not ready" errors. - # This handles the race condition where the client acquires a lease before - # the exporter has transitioned to LEASE_READY status. + # Retry Dial and router connection with exponential backoff for transient + # errors. This handles: + # 1. The race condition where the client acquires a lease before the + # exporter has transitioned to LEASE_READY status (FAILED_PRECONDITION). + # 2. Transient network failures where the tunnel to the router drops and + # needs to be re-established (UNAVAILABLE, etc.). # Uses time-based retry bounded by dial_timeout instead of fixed retry count. base_delay = 0.3 - max_delay = 2.0 + max_delay = 5.0 deadline = time.monotonic() + self.dial_timeout attempt = 0 while True: @@ -327,8 +338,8 @@ async def handle_async(self, stream): response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) break except AioRpcError as e: + remaining = deadline - time.monotonic() if e.code() == grpc.StatusCode.FAILED_PRECONDITION and "not ready" in str(e.details()): - remaining = deadline - time.monotonic() if remaining <= 0: logger.debug( "Exporter not ready and dial timeout (%.1fs) exceeded after %d attempts", @@ -346,21 +357,24 @@ async def handle_async(self, stream): await sleep(delay) attempt += 1 continue - if e.code() == grpc.StatusCode.UNAVAILABLE: - remaining = deadline - time.monotonic() + # Retry on transient network errors (e.g. tunnel to router dropped) + if e.code() in self._TRANSIENT_GRPC_CODES: if remaining <= 0: logger.warning( - "Exporter unavailable and dial timeout (%.1fs) exceeded after %d attempts", - self.dial_timeout, + "Dial failed with transient error after %d attempts (%.1fs elapsed): %s", attempt + 1, + self.dial_timeout, + e.details(), ) - raise + return delay = min(base_delay * (2**attempt), max_delay, remaining) - logger.warning( - "Exporter unavailable, retrying Dial in %.1fs (attempt %d, %.1fs remaining)", + logger.info( + "Dial failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", + e.code().name, delay, attempt + 1, remaining, + e.details(), ) await sleep(delay) attempt += 1 @@ -375,10 +389,61 @@ async def handle_async(self, stream): else: logger.warning("Connection to exporter lost: %s", e.details()) return - async with connect_router_stream( - response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options - ): - pass + + # Connect to the router with retry for transient failures. + # After a successful Dial, the router endpoint may still be temporarily + # unreachable (e.g. after a tunnel drop). Retry the connection to give + # the network time to recover. + remaining = deadline - time.monotonic() + router_attempt = 0 + while True: + try: + async with connect_router_stream( + response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options + ): + return + except AioRpcError as e: + remaining = deadline - time.monotonic() + if e.code() in self._TRANSIENT_GRPC_CODES and remaining > 0: + delay = min(base_delay * (2**router_attempt), max_delay, remaining) + logger.info( + "Router connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", + e.code().name, + delay, + router_attempt + 1, + remaining, + e.details(), + ) + await sleep(delay) + router_attempt += 1 + # Re-dial to get a fresh router token since the old one may + # have expired during the retry window + try: + response = await self.controller.Dial( + jumpstarter_pb2.DialRequest(lease_name=self.name) + ) + except AioRpcError: + logger.debug("Re-dial failed during router retry, will retry from Dial") + continue + logger.warning("Router connection failed: %s (code=%s)", e.details(), e.code().name) + return + except OSError as e: + # OSError can occur when the router endpoint is unreachable + remaining = deadline - time.monotonic() + if remaining > 0: + delay = min(base_delay * (2**router_attempt), max_delay, remaining) + logger.info( + "Router connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s", + delay, + router_attempt + 1, + remaining, + e, + ) + await sleep(delay) + router_attempt += 1 + continue + logger.warning("Router connection failed: %s", e) + return @asynccontextmanager async def serve_unix_async(self): diff --git a/python/packages/jumpstarter/jumpstarter/common/streams.py b/python/packages/jumpstarter/jumpstarter/common/streams.py index 8cdc02330..c84345853 100644 --- a/python/packages/jumpstarter/jumpstarter/common/streams.py +++ b/python/packages/jumpstarter/jumpstarter/common/streams.py @@ -1,3 +1,4 @@ +import asyncio from contextlib import asynccontextmanager from typing import Annotated, Literal, Union from uuid import UUID @@ -34,13 +35,27 @@ class StreamRequestMetadata(BaseModel): @asynccontextmanager -async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options): +async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options, channel_ready_timeout=10): credentials = grpc.composite_channel_credentials( await ssl_channel_credentials(endpoint, tls_config), grpc.access_token_call_credentials(token), ) async with aio_secure_channel(endpoint, credentials, grpc_options) as channel: + # Wait for the channel to be ready before starting the stream. + # Without this, a broken router connection would cause the gRPC + # stream to hang indefinitely waiting for the HTTP/2 SETTINGS frame, + # which manifests as a timeout for the j command on the Unix socket. + try: + await asyncio.wait_for(channel.channel_ready(), timeout=channel_ready_timeout) + except asyncio.TimeoutError: + raise grpc.aio.AioRpcError( + code=grpc.StatusCode.UNAVAILABLE, + initial_metadata=grpc.aio.Metadata(), + trailing_metadata=grpc.aio.Metadata(), + details=f"Timed out waiting for router channel to become ready ({channel_ready_timeout}s)", + debug_error_string=None, + ) from None router = router_pb2_grpc.RouterServiceStub(channel) context = router.Stream(metadata=()) async with RouterStream(context=context) as s: From 6e02c04fafd92b63480f7b2c3c0d18205d260e7e Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Tue, 12 May 2026 18:37:13 +0000 Subject: [PATCH 02/10] test: add tests for tunnel reconnection retry logic Add unit tests covering the new retry logic in handle_async (lease.py) and the channel_ready timeout in connect_router_stream (streams.py) to satisfy the 80% diff coverage requirement. Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter/client/lease_test.py | 296 +++++++++++++++--- .../jumpstarter/common/streams_test.py | 88 ++++++ 2 files changed, 340 insertions(+), 44 deletions(-) create mode 100644 python/packages/jumpstarter/jumpstarter/common/streams_test.py diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index dffd6a288..e3121f5a9 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -1,6 +1,7 @@ import asyncio import logging import sys +from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, Mock, patch @@ -572,61 +573,268 @@ async def get_then_fail(): assert remain_arg == timedelta(0) -class TestHandleAsyncUnavailableRetry: - """Tests for Lease.handle_async UNAVAILABLE retry behavior.""" +def _make_aio_rpc_error(code, details="error"): + """Helper to construct an AioRpcError.""" + return AioRpcError( + code=code, + initial_metadata=grpc.aio.Metadata(), + trailing_metadata=grpc.aio.Metadata(), + details=details, + debug_error_string=None, + ) - def _make_lease_for_handle(self): - lease = object.__new__(Lease) - lease.name = "test-lease" - lease.dial_timeout = 5.0 - lease.lease_transferred = False - lease.tls_config = Mock() - lease.grpc_options = {} - lease.controller = Mock() - return lease + +def _make_lease_for_handle(): + """Create a minimal Lease for testing handle_async.""" + lease = object.__new__(Lease) + lease.name = "test-lease" + lease.dial_timeout = 5.0 + lease.tls_config = Mock() + lease.grpc_options = {} + lease.controller = Mock() + lease.lease_transferred = False + return lease + + +class TestHandleAsyncTransientDialRetry: + """Tests for transient gRPC error retry in handle_async Dial phase.""" + + @pytest.mark.anyio + async def test_dial_retries_on_unavailable_then_succeeds(self): + """Dial should retry on UNAVAILABLE and succeed on the next attempt.""" + lease = _make_lease_for_handle() + + dial_response = Mock(router_endpoint="ep", router_token="tok") + call_count = 0 + + async def dial_side_effect(req): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "tunnel dropped") + return dial_response + + lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + with patch("jumpstarter.client.lease.connect_router_stream") as mock_router: + + @asynccontextmanager + async def fake_router(*args, **kwargs): + yield + + mock_router.side_effect = fake_router + await lease.handle_async(Mock()) + + assert call_count == 2 + + @pytest.mark.anyio + async def test_dial_transient_error_returns_after_timeout(self): + """Dial should give up and return when dial_timeout is exceeded.""" + lease = _make_lease_for_handle() + lease.dial_timeout = 0.0 # already expired + + lease.controller.Dial = AsyncMock( + side_effect=_make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "tunnel dropped"), + ) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + await lease.handle_async(Mock()) + + # Should return without raising + lease.controller.Dial.assert_called_once() + + @pytest.mark.anyio + @pytest.mark.parametrize( + "status_code", + [grpc.StatusCode.RESOURCE_EXHAUSTED, grpc.StatusCode.ABORTED, grpc.StatusCode.INTERNAL], + ids=["RESOURCE_EXHAUSTED", "ABORTED", "INTERNAL"], + ) + async def test_dial_retries_multiple_transient_codes(self, status_code): + """Dial should retry on RESOURCE_EXHAUSTED, ABORTED, INTERNAL.""" + lease = _make_lease_for_handle() + dial_response = Mock(router_endpoint="ep", router_token="tok") + call_count = 0 + + async def dial_side_effect(req): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise _make_aio_rpc_error(status_code, "transient") + return dial_response + + lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + with patch("jumpstarter.client.lease.connect_router_stream") as mock_router: + + @asynccontextmanager + async def fake_router(*args, **kwargs): + yield + + mock_router.side_effect = fake_router + await lease.handle_async(Mock()) + + assert call_count == 2, f"Expected 2 calls for {status_code}, got {call_count}" + + +class TestHandleAsyncRouterRetry: + """Tests for router connection retry in handle_async.""" @pytest.mark.anyio - async def test_handle_async_retries_unavailable_then_succeeds(self): - """Dial returns UNAVAILABLE once then succeeds on retry.""" - lease = self._make_lease_for_handle() - dial_call_count = 0 + async def test_router_retries_on_transient_error_then_succeeds(self): + """Router connection should retry on transient error, re-dial, then succeed.""" + lease = _make_lease_for_handle() + dial_response = Mock(router_endpoint="ep", router_token="tok") + lease.controller.Dial = AsyncMock(return_value=dial_response) + + connect_count = 0 + + @asynccontextmanager + async def fake_router(*args, **kwargs): + nonlocal connect_count + connect_count += 1 + if connect_count == 1: + raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "router unreachable") + yield - async def mock_dial(request): - nonlocal dial_call_count - dial_call_count += 1 - if dial_call_count == 1: - raise MockAioRpcError(grpc.StatusCode.UNAVAILABLE, "temporarily unavailable") - return Mock(router_endpoint="endpoint", router_token="token") + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fake_router): + await lease.handle_async(Mock()) + + assert connect_count == 2 + # Dial called once for initial + once for re-dial + assert lease.controller.Dial.call_count == 2 + + @pytest.mark.anyio + async def test_router_non_transient_error_returns_immediately(self): + """Router connection should not retry on non-transient errors.""" + lease = _make_lease_for_handle() + dial_response = Mock(router_endpoint="ep", router_token="tok") + lease.controller.Dial = AsyncMock(return_value=dial_response) + + @asynccontextmanager + async def fail_router(*args, **kwargs): + raise _make_aio_rpc_error(grpc.StatusCode.PERMISSION_DENIED, "no access") + yield # pragma: no cover + + with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fail_router): + await lease.handle_async(Mock()) + + # Only the initial Dial, no re-dial + assert lease.controller.Dial.call_count == 1 - lease.controller.Dial = mock_dial + @pytest.mark.anyio + async def test_router_transient_error_returns_after_timeout(self): + """Router should give up when dial_timeout is exceeded.""" + lease = _make_lease_for_handle() + lease.dial_timeout = 0.0 # already expired + dial_response = Mock(router_endpoint="ep", router_token="tok") + lease.controller.Dial = AsyncMock(return_value=dial_response) - with patch("jumpstarter.client.lease.connect_router_stream") as mock_connect: - mock_connect.return_value.__aenter__ = AsyncMock() - mock_connect.return_value.__aexit__ = AsyncMock(return_value=False) - stream = Mock() + @asynccontextmanager + async def fail_router(*args, **kwargs): + raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "unreachable") + yield # pragma: no cover - await lease.handle_async(stream) + with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fail_router): + await lease.handle_async(Mock()) - assert dial_call_count == 2 - mock_connect.assert_called_once_with("endpoint", "token", stream, lease.tls_config, lease.grpc_options) + # Only one Dial (initial), no retry + assert lease.controller.Dial.call_count == 1 @pytest.mark.anyio - async def test_handle_async_unavailable_exceeds_dial_timeout(self): - """Dial returns UNAVAILABLE until dial_timeout is exceeded, then raises.""" - lease = self._make_lease_for_handle() - lease.dial_timeout = 0.5 - dial_call_count = 0 + async def test_router_redial_failure_is_swallowed(self): + """When re-dial fails during router retry, the error is logged and retry continues.""" + lease = _make_lease_for_handle() + dial_response = Mock(router_endpoint="ep", router_token="tok") + + dial_count = 0 + + async def dial_side_effect(req): + nonlocal dial_count + dial_count += 1 + if dial_count == 1: + return dial_response + if dial_count == 2: + # Re-dial fails + raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "re-dial failed") + return dial_response + + lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) + + connect_count = 0 + + @asynccontextmanager + async def fake_router(*args, **kwargs): + nonlocal connect_count + connect_count += 1 + if connect_count <= 2: + raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "router fail") + yield - async def mock_dial(request): - nonlocal dial_call_count - dial_call_count += 1 - raise MockAioRpcError(grpc.StatusCode.UNAVAILABLE, "permanently unavailable") + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fake_router): + await lease.handle_async(Mock()) - lease.controller.Dial = mock_dial - stream = Mock() + # Should have retried: connect fails, re-dial fails, connect fails again, + # re-dial succeeds, third connect succeeds + assert connect_count == 3 + assert dial_count == 3 - with pytest.raises(AioRpcError) as exc_info: - await lease.handle_async(stream) + @pytest.mark.anyio + async def test_router_oserror_retries_then_succeeds(self): + """Router connection should retry on OSError, then succeed.""" + lease = _make_lease_for_handle() + dial_response = Mock(router_endpoint="ep", router_token="tok") + lease.controller.Dial = AsyncMock(return_value=dial_response) + + connect_count = 0 + + @asynccontextmanager + async def fake_router(*args, **kwargs): + nonlocal connect_count + connect_count += 1 + if connect_count == 1: + raise OSError("Connection refused") + yield - assert exc_info.value.code() == grpc.StatusCode.UNAVAILABLE - assert dial_call_count >= 2 + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fake_router): + await lease.handle_async(Mock()) + + assert connect_count == 2 + + @pytest.mark.anyio + async def test_router_oserror_returns_after_timeout(self): + """Router should give up on OSError when dial_timeout is exceeded.""" + lease = _make_lease_for_handle() + lease.dial_timeout = 0.0 # already expired + dial_response = Mock(router_endpoint="ep", router_token="tok") + lease.controller.Dial = AsyncMock(return_value=dial_response) + + @asynccontextmanager + async def fail_router(*args, **kwargs): + raise OSError("Connection refused") + yield # pragma: no cover + + with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fail_router): + await lease.handle_async(Mock()) + + # Only the initial Dial, no retry + assert lease.controller.Dial.call_count == 1 + + +class TestTransientGrpcCodes: + """Tests for the _TRANSIENT_GRPC_CODES class attribute.""" + + def test_contains_expected_codes(self): + assert grpc.StatusCode.UNAVAILABLE in Lease._TRANSIENT_GRPC_CODES + assert grpc.StatusCode.RESOURCE_EXHAUSTED in Lease._TRANSIENT_GRPC_CODES + assert grpc.StatusCode.ABORTED in Lease._TRANSIENT_GRPC_CODES + assert grpc.StatusCode.INTERNAL in Lease._TRANSIENT_GRPC_CODES + + def test_does_not_contain_non_transient_codes(self): + assert grpc.StatusCode.PERMISSION_DENIED not in Lease._TRANSIENT_GRPC_CODES + assert grpc.StatusCode.NOT_FOUND not in Lease._TRANSIENT_GRPC_CODES + assert grpc.StatusCode.FAILED_PRECONDITION not in Lease._TRANSIENT_GRPC_CODES diff --git a/python/packages/jumpstarter/jumpstarter/common/streams_test.py b/python/packages/jumpstarter/jumpstarter/common/streams_test.py new file mode 100644 index 000000000..be89d5ee8 --- /dev/null +++ b/python/packages/jumpstarter/jumpstarter/common/streams_test.py @@ -0,0 +1,88 @@ +import asyncio +from contextlib import asynccontextmanager +from unittest.mock import AsyncMock, Mock, patch + +import grpc +import pytest +from grpc.aio import AioRpcError + +from jumpstarter.common.streams import connect_router_stream + + +class TestConnectRouterStreamChannelReady: + """Tests for the channel_ready timeout logic in connect_router_stream.""" + + @pytest.mark.anyio + async def test_raises_unavailable_on_channel_ready_timeout(self): + """When channel_ready() times out, an AioRpcError with UNAVAILABLE should be raised.""" + mock_channel = Mock() + + # Make channel_ready() return a coroutine that never completes + async def hang_forever(): + await asyncio.sleep(999) + + mock_channel.channel_ready = Mock(return_value=hang_forever()) + + @asynccontextmanager + async def fake_secure_channel(*args, **kwargs): + yield mock_channel + + with ( + patch("jumpstarter.common.streams.ssl_channel_credentials", new_callable=AsyncMock), + patch("jumpstarter.common.streams.aio_secure_channel", side_effect=fake_secure_channel), + patch("grpc.composite_channel_credentials", return_value=Mock()), + patch("grpc.access_token_call_credentials", return_value=Mock()), + ): + with pytest.raises(AioRpcError) as exc_info: + async with connect_router_stream( + "endpoint:443", "token", Mock(), Mock(), {}, channel_ready_timeout=0.01 + ): + pass # pragma: no cover + + assert exc_info.value.code() == grpc.StatusCode.UNAVAILABLE + assert "Timed out" in str(exc_info.value.details()) + + @pytest.mark.anyio + async def test_proceeds_when_channel_ready_succeeds(self): + """When channel_ready() succeeds quickly, the stream should be set up normally.""" + mock_channel = Mock() + + # channel_ready() resolves immediately + async def ready_immediately(): + pass + + mock_channel.channel_ready = Mock(return_value=ready_immediately()) + + mock_context = Mock() + + @asynccontextmanager + async def fake_secure_channel(*args, **kwargs): + yield mock_channel + + @asynccontextmanager + async def fake_router_stream(*args, **kwargs): + yield Mock() + + @asynccontextmanager + async def fake_forward(*args, **kwargs): + yield + + with ( + patch("jumpstarter.common.streams.ssl_channel_credentials", new_callable=AsyncMock), + patch("jumpstarter.common.streams.aio_secure_channel", side_effect=fake_secure_channel), + patch("grpc.composite_channel_credentials", return_value=Mock()), + patch("grpc.access_token_call_credentials", return_value=Mock()), + patch("jumpstarter.common.streams.router_pb2_grpc.RouterServiceStub") as mock_stub_cls, + patch("jumpstarter.common.streams.RouterStream", side_effect=fake_router_stream), + patch("jumpstarter.common.streams.forward_stream", side_effect=fake_forward), + ): + mock_stub = Mock() + mock_stub.Stream.return_value = mock_context + mock_stub_cls.return_value = mock_stub + + async with connect_router_stream( + "endpoint:443", "token", Mock(), Mock(), {}, channel_ready_timeout=5 + ): + pass # Successfully entered the context + + mock_channel.channel_ready.assert_called_once() From 35d939931eb10779da5455b38904941829b723ea Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Wed, 13 May 2026 15:36:46 +0000 Subject: [PATCH 03/10] refactor: unify Dial and router retry into a single loop Extract _dial_and_connect() to perform Dial + router connection as a single atomic operation. This eliminates the duplicated Dial call that was in the separate router retry block, addressing the code review feedback about entangled and repeated code. The single retry loop in handle_async now retries the full _dial_and_connect() on transient errors, which naturally handles both Dial failures and router connection failures with the same backoff logic and always gets a fresh router token on retry. Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter/jumpstarter/client/lease.py | 74 ++++++------------- .../jumpstarter/client/lease_test.py | 66 ++++++++--------- 2 files changed, 56 insertions(+), 84 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index 75b6cc0a4..f174be46c 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -320,9 +320,21 @@ def __contextmanager__(self) -> Generator[Self]: grpc.StatusCode.INTERNAL, }) - async def handle_async(self, stream): # noqa: C901 + async def _dial_and_connect(self, stream): + """Dial the controller and connect to the router stream. + + Performs a single Dial + router connection attempt. Raises on failure + so the caller can decide whether to retry. + """ + response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) + async with connect_router_stream( + response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options + ): + pass + + async def handle_async(self, stream): logger.debug("Connecting to Lease with name %s", self.name) - # Retry Dial and router connection with exponential backoff for transient + # Retry Dial + router connection with exponential backoff for transient # errors. This handles: # 1. The race condition where the client acquires a lease before the # exporter has transitioned to LEASE_READY status (FAILED_PRECONDITION). @@ -335,8 +347,8 @@ async def handle_async(self, stream): # noqa: C901 attempt = 0 while True: try: - response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) - break + await self._dial_and_connect(stream) + return except AioRpcError as e: remaining = deadline - time.monotonic() if e.code() == grpc.StatusCode.FAILED_PRECONDITION and "not ready" in str(e.details()): @@ -349,7 +361,7 @@ async def handle_async(self, stream): # noqa: C901 raise delay = min(base_delay * (2**attempt), max_delay, remaining) logger.debug( - "Exporter not ready, retrying Dial in %.1fs (attempt %d, %.1fs remaining)", + "Exporter not ready, retrying in %.1fs (attempt %d, %.1fs remaining)", delay, attempt + 1, remaining, @@ -361,7 +373,7 @@ async def handle_async(self, stream): # noqa: C901 if e.code() in self._TRANSIENT_GRPC_CODES: if remaining <= 0: logger.warning( - "Dial failed with transient error after %d attempts (%.1fs elapsed): %s", + "Connection failed with transient error after %d attempts (%.1fs elapsed): %s", attempt + 1, self.dial_timeout, e.details(), @@ -369,7 +381,7 @@ async def handle_async(self, stream): # noqa: C901 return delay = min(base_delay * (2**attempt), max_delay, remaining) logger.info( - "Dial failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", + "Connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", e.code().name, delay, attempt + 1, @@ -389,60 +401,22 @@ async def handle_async(self, stream): # noqa: C901 else: logger.warning("Connection to exporter lost: %s", e.details()) return - - # Connect to the router with retry for transient failures. - # After a successful Dial, the router endpoint may still be temporarily - # unreachable (e.g. after a tunnel drop). Retry the connection to give - # the network time to recover. - remaining = deadline - time.monotonic() - router_attempt = 0 - while True: - try: - async with connect_router_stream( - response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options - ): - return - except AioRpcError as e: - remaining = deadline - time.monotonic() - if e.code() in self._TRANSIENT_GRPC_CODES and remaining > 0: - delay = min(base_delay * (2**router_attempt), max_delay, remaining) - logger.info( - "Router connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", - e.code().name, - delay, - router_attempt + 1, - remaining, - e.details(), - ) - await sleep(delay) - router_attempt += 1 - # Re-dial to get a fresh router token since the old one may - # have expired during the retry window - try: - response = await self.controller.Dial( - jumpstarter_pb2.DialRequest(lease_name=self.name) - ) - except AioRpcError: - logger.debug("Re-dial failed during router retry, will retry from Dial") - continue - logger.warning("Router connection failed: %s (code=%s)", e.details(), e.code().name) - return except OSError as e: # OSError can occur when the router endpoint is unreachable remaining = deadline - time.monotonic() if remaining > 0: - delay = min(base_delay * (2**router_attempt), max_delay, remaining) + delay = min(base_delay * (2**attempt), max_delay, remaining) logger.info( - "Router connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s", + "Connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s", delay, - router_attempt + 1, + attempt + 1, remaining, e, ) await sleep(delay) - router_attempt += 1 + attempt += 1 continue - logger.warning("Router connection failed: %s", e) + logger.warning("Connection failed: %s", e) return @asynccontextmanager diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index e3121f5a9..03b1fbd48 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -596,12 +596,12 @@ def _make_lease_for_handle(): return lease -class TestHandleAsyncTransientDialRetry: - """Tests for transient gRPC error retry in handle_async Dial phase.""" +class TestHandleAsyncTransientRetry: + """Tests for transient gRPC error retry in handle_async (unified Dial + router loop).""" @pytest.mark.anyio - async def test_dial_retries_on_unavailable_then_succeeds(self): - """Dial should retry on UNAVAILABLE and succeed on the next attempt.""" + async def test_retries_on_dial_unavailable_then_succeeds(self): + """Should retry on UNAVAILABLE from Dial and succeed on the next attempt.""" lease = _make_lease_for_handle() dial_response = Mock(router_endpoint="ep", router_token="tok") @@ -629,8 +629,8 @@ async def fake_router(*args, **kwargs): assert call_count == 2 @pytest.mark.anyio - async def test_dial_transient_error_returns_after_timeout(self): - """Dial should give up and return when dial_timeout is exceeded.""" + async def test_transient_error_returns_after_timeout(self): + """Should give up and return when dial_timeout is exceeded.""" lease = _make_lease_for_handle() lease.dial_timeout = 0.0 # already expired @@ -650,8 +650,8 @@ async def test_dial_transient_error_returns_after_timeout(self): [grpc.StatusCode.RESOURCE_EXHAUSTED, grpc.StatusCode.ABORTED, grpc.StatusCode.INTERNAL], ids=["RESOURCE_EXHAUSTED", "ABORTED", "INTERNAL"], ) - async def test_dial_retries_multiple_transient_codes(self, status_code): - """Dial should retry on RESOURCE_EXHAUSTED, ABORTED, INTERNAL.""" + async def test_retries_multiple_transient_codes(self, status_code): + """Should retry on RESOURCE_EXHAUSTED, ABORTED, INTERNAL.""" lease = _make_lease_for_handle() dial_response = Mock(router_endpoint="ep", router_token="tok") call_count = 0 @@ -677,13 +677,9 @@ async def fake_router(*args, **kwargs): assert call_count == 2, f"Expected 2 calls for {status_code}, got {call_count}" - -class TestHandleAsyncRouterRetry: - """Tests for router connection retry in handle_async.""" - @pytest.mark.anyio - async def test_router_retries_on_transient_error_then_succeeds(self): - """Router connection should retry on transient error, re-dial, then succeed.""" + async def test_router_transient_error_retries_full_dial_and_connect(self): + """Router transient error should retry the full Dial + connect cycle.""" lease = _make_lease_for_handle() dial_response = Mock(router_endpoint="ep", router_token="tok") lease.controller.Dial = AsyncMock(return_value=dial_response) @@ -703,30 +699,30 @@ async def fake_router(*args, **kwargs): await lease.handle_async(Mock()) assert connect_count == 2 - # Dial called once for initial + once for re-dial + # Dial is called fresh each attempt (unified loop) assert lease.controller.Dial.call_count == 2 @pytest.mark.anyio - async def test_router_non_transient_error_returns_immediately(self): - """Router connection should not retry on non-transient errors.""" + async def test_non_transient_error_returns_immediately(self): + """Non-transient errors should not be retried.""" lease = _make_lease_for_handle() dial_response = Mock(router_endpoint="ep", router_token="tok") lease.controller.Dial = AsyncMock(return_value=dial_response) @asynccontextmanager async def fail_router(*args, **kwargs): - raise _make_aio_rpc_error(grpc.StatusCode.PERMISSION_DENIED, "no access") + raise _make_aio_rpc_error(grpc.StatusCode.NOT_FOUND, "not found") yield # pragma: no cover with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fail_router): await lease.handle_async(Mock()) - # Only the initial Dial, no re-dial + # Only one Dial attempt, no retry assert lease.controller.Dial.call_count == 1 @pytest.mark.anyio - async def test_router_transient_error_returns_after_timeout(self): - """Router should give up when dial_timeout is exceeded.""" + async def test_transient_router_error_returns_after_timeout(self): + """Should give up when dial_timeout is exceeded during router retries.""" lease = _make_lease_for_handle() lease.dial_timeout = 0.0 # already expired dial_response = Mock(router_endpoint="ep", router_token="tok") @@ -744,8 +740,8 @@ async def fail_router(*args, **kwargs): assert lease.controller.Dial.call_count == 1 @pytest.mark.anyio - async def test_router_redial_failure_is_swallowed(self): - """When re-dial fails during router retry, the error is logged and retry continues.""" + async def test_dial_failure_on_retry_is_retried_again(self): + """When Dial fails with a transient error during retry, it should keep retrying.""" lease = _make_lease_for_handle() dial_response = Mock(router_endpoint="ep", router_token="tok") @@ -755,11 +751,10 @@ async def dial_side_effect(req): nonlocal dial_count dial_count += 1 if dial_count == 1: - return dial_response + return dial_response # first Dial succeeds, router will fail if dial_count == 2: - # Re-dial fails raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "re-dial failed") - return dial_response + return dial_response # third Dial succeeds lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) @@ -769,7 +764,7 @@ async def dial_side_effect(req): async def fake_router(*args, **kwargs): nonlocal connect_count connect_count += 1 - if connect_count <= 2: + if connect_count == 1: raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "router fail") yield @@ -777,14 +772,15 @@ async def fake_router(*args, **kwargs): with patch("jumpstarter.client.lease.connect_router_stream", side_effect=fake_router): await lease.handle_async(Mock()) - # Should have retried: connect fails, re-dial fails, connect fails again, - # re-dial succeeds, third connect succeeds - assert connect_count == 3 + # Attempt 1: Dial OK -> router fails (UNAVAILABLE) + # Attempt 2: Dial fails (UNAVAILABLE) -> retried + # Attempt 3: Dial OK -> router OK assert dial_count == 3 + assert connect_count == 2 @pytest.mark.anyio - async def test_router_oserror_retries_then_succeeds(self): - """Router connection should retry on OSError, then succeed.""" + async def test_oserror_retries_then_succeeds(self): + """OSError from router should retry the full Dial + connect cycle.""" lease = _make_lease_for_handle() dial_response = Mock(router_endpoint="ep", router_token="tok") lease.controller.Dial = AsyncMock(return_value=dial_response) @@ -804,10 +800,12 @@ async def fake_router(*args, **kwargs): await lease.handle_async(Mock()) assert connect_count == 2 + # Dial called fresh each attempt + assert lease.controller.Dial.call_count == 2 @pytest.mark.anyio - async def test_router_oserror_returns_after_timeout(self): - """Router should give up on OSError when dial_timeout is exceeded.""" + async def test_oserror_returns_after_timeout(self): + """Should give up on OSError when dial_timeout is exceeded.""" lease = _make_lease_for_handle() lease.dial_timeout = 0.0 # already expired dial_response = Mock(router_endpoint="ep", router_token="tok") From 046961247dcc46a203e39ec7bef20de82be3c37c Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 28 May 2026 06:32:50 +0000 Subject: [PATCH 04/10] ci: retrigger CI for flaky hooks_test on macOS From 83e22a329bab9bfb40ab49989a62aa2883e546f2 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 28 May 2026 08:33:09 +0000 Subject: [PATCH 05/10] fix: add UNKNOWN to transient gRPC codes for tunnel teardown retries Include grpc.StatusCode.UNKNOWN in _TRANSIENT_GRPC_CODES since tunnel teardowns (e.g. "watch channel closed") surface as UNKNOWN rather than UNAVAILABLE. The retry is still bounded by dial_timeout so this won't mask persistent server bugs. Co-Authored-By: Claude Opus 4.6 --- python/packages/jumpstarter/jumpstarter/client/lease.py | 5 ++++- python/packages/jumpstarter/jumpstarter/client/lease_test.py | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index f174be46c..0b12a1407 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -312,12 +312,15 @@ def __contextmanager__(self) -> Generator[Self]: with self.portal.wrap_async_context_manager(self) as value: yield value - # gRPC status codes that indicate transient network failures worth retrying + # gRPC status codes that indicate transient network failures worth retrying. + # UNKNOWN is included because tunnel teardowns (e.g. "watch channel closed") + # surface as UNKNOWN rather than UNAVAILABLE. _TRANSIENT_GRPC_CODES = frozenset({ grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.RESOURCE_EXHAUSTED, grpc.StatusCode.ABORTED, grpc.StatusCode.INTERNAL, + grpc.StatusCode.UNKNOWN, }) async def _dial_and_connect(self, stream): diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index 03b1fbd48..43946d1d0 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -647,8 +647,8 @@ async def test_transient_error_returns_after_timeout(self): @pytest.mark.anyio @pytest.mark.parametrize( "status_code", - [grpc.StatusCode.RESOURCE_EXHAUSTED, grpc.StatusCode.ABORTED, grpc.StatusCode.INTERNAL], - ids=["RESOURCE_EXHAUSTED", "ABORTED", "INTERNAL"], + [grpc.StatusCode.RESOURCE_EXHAUSTED, grpc.StatusCode.ABORTED, grpc.StatusCode.INTERNAL, grpc.StatusCode.UNKNOWN], + ids=["RESOURCE_EXHAUSTED", "ABORTED", "INTERNAL", "UNKNOWN"], ) async def test_retries_multiple_transient_codes(self, status_code): """Should retry on RESOURCE_EXHAUSTED, ABORTED, INTERNAL.""" @@ -831,6 +831,7 @@ def test_contains_expected_codes(self): assert grpc.StatusCode.RESOURCE_EXHAUSTED in Lease._TRANSIENT_GRPC_CODES assert grpc.StatusCode.ABORTED in Lease._TRANSIENT_GRPC_CODES assert grpc.StatusCode.INTERNAL in Lease._TRANSIENT_GRPC_CODES + assert grpc.StatusCode.UNKNOWN in Lease._TRANSIENT_GRPC_CODES def test_does_not_contain_non_transient_codes(self): assert grpc.StatusCode.PERMISSION_DENIED not in Lease._TRANSIENT_GRPC_CODES From fe49c94a13b00037be981fdcef06450e58aece3e Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 28 May 2026 09:34:21 +0000 Subject: [PATCH 06/10] fix: narrow UNKNOWN retry to specific tunnel teardown messages Remove StatusCode.UNKNOWN from the blanket _TRANSIENT_GRPC_CODES set to avoid masking unrelated server-side errors. Instead, add a targeted check via _TRANSIENT_UNKNOWN_MESSAGES that only retries UNKNOWN when the error details contain known transient messages like "watch channel closed". Also fixes lint E501 line-too-long in tests. Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter/jumpstarter/client/lease.py | 19 ++++-- .../jumpstarter/client/lease_test.py | 64 ++++++++++++++++++- 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index 0b12a1407..000376876 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -313,16 +313,19 @@ def __contextmanager__(self) -> Generator[Self]: yield value # gRPC status codes that indicate transient network failures worth retrying. - # UNKNOWN is included because tunnel teardowns (e.g. "watch channel closed") - # surface as UNKNOWN rather than UNAVAILABLE. _TRANSIENT_GRPC_CODES = frozenset({ grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.RESOURCE_EXHAUSTED, grpc.StatusCode.ABORTED, grpc.StatusCode.INTERNAL, - grpc.StatusCode.UNKNOWN, }) + # UNKNOWN error messages that indicate transient tunnel teardowns. + # We don't blanket-retry all UNKNOWN errors (they could be permanent + # server bugs), but specific messages like "watch channel closed" are + # known to occur during tunnel reconnection. + _TRANSIENT_UNKNOWN_MESSAGES = ("watch channel closed",) + async def _dial_and_connect(self, stream): """Dial the controller and connect to the router stream. @@ -372,8 +375,14 @@ async def handle_async(self, stream): await sleep(delay) attempt += 1 continue - # Retry on transient network errors (e.g. tunnel to router dropped) - if e.code() in self._TRANSIENT_GRPC_CODES: + # Retry on transient network errors (e.g. tunnel to router dropped). + # Also retry UNKNOWN when the message matches a known transient + # tunnel teardown (e.g. "watch channel closed"). + is_transient = e.code() in self._TRANSIENT_GRPC_CODES or ( + e.code() == grpc.StatusCode.UNKNOWN + and any(msg in str(e.details()).lower() for msg in self._TRANSIENT_UNKNOWN_MESSAGES) + ) + if is_transient: if remaining <= 0: logger.warning( "Connection failed with transient error after %d attempts (%.1fs elapsed): %s", diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index 43946d1d0..cf3fbd266 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -647,8 +647,12 @@ async def test_transient_error_returns_after_timeout(self): @pytest.mark.anyio @pytest.mark.parametrize( "status_code", - [grpc.StatusCode.RESOURCE_EXHAUSTED, grpc.StatusCode.ABORTED, grpc.StatusCode.INTERNAL, grpc.StatusCode.UNKNOWN], - ids=["RESOURCE_EXHAUSTED", "ABORTED", "INTERNAL", "UNKNOWN"], + [ + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.ABORTED, + grpc.StatusCode.INTERNAL, + ], + ids=["RESOURCE_EXHAUSTED", "ABORTED", "INTERNAL"], ) async def test_retries_multiple_transient_codes(self, status_code): """Should retry on RESOURCE_EXHAUSTED, ABORTED, INTERNAL.""" @@ -677,6 +681,53 @@ async def fake_router(*args, **kwargs): assert call_count == 2, f"Expected 2 calls for {status_code}, got {call_count}" + @pytest.mark.anyio + async def test_retries_unknown_with_watch_channel_closed(self): + """Should retry UNKNOWN only when details contain 'watch channel closed'.""" + lease = _make_lease_for_handle() + dial_response = Mock(router_endpoint="ep", router_token="tok") + call_count = 0 + + async def dial_side_effect(req): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise _make_aio_rpc_error( + grpc.StatusCode.UNKNOWN, "watch channel closed" + ) + return dial_response + + lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + with patch("jumpstarter.client.lease.connect_router_stream") as mock_router: + + @asynccontextmanager + async def fake_router(*args, **kwargs): + yield + + mock_router.side_effect = fake_router + await lease.handle_async(Mock()) + + assert call_count == 2 + + @pytest.mark.anyio + async def test_unknown_without_known_message_not_retried(self): + """UNKNOWN with an unrecognized message should NOT be retried.""" + lease = _make_lease_for_handle() + + lease.controller.Dial = AsyncMock( + side_effect=_make_aio_rpc_error( + grpc.StatusCode.UNKNOWN, "some unexpected server bug" + ), + ) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + await lease.handle_async(Mock()) + + # Should return after just one attempt (no retry) + lease.controller.Dial.assert_called_once() + @pytest.mark.anyio async def test_router_transient_error_retries_full_dial_and_connect(self): """Router transient error should retry the full Dial + connect cycle.""" @@ -831,9 +882,16 @@ def test_contains_expected_codes(self): assert grpc.StatusCode.RESOURCE_EXHAUSTED in Lease._TRANSIENT_GRPC_CODES assert grpc.StatusCode.ABORTED in Lease._TRANSIENT_GRPC_CODES assert grpc.StatusCode.INTERNAL in Lease._TRANSIENT_GRPC_CODES - assert grpc.StatusCode.UNKNOWN in Lease._TRANSIENT_GRPC_CODES + + def test_unknown_not_in_blanket_transient_codes(self): + """UNKNOWN is handled separately via _TRANSIENT_UNKNOWN_MESSAGES.""" + assert grpc.StatusCode.UNKNOWN not in Lease._TRANSIENT_GRPC_CODES def test_does_not_contain_non_transient_codes(self): assert grpc.StatusCode.PERMISSION_DENIED not in Lease._TRANSIENT_GRPC_CODES assert grpc.StatusCode.NOT_FOUND not in Lease._TRANSIENT_GRPC_CODES assert grpc.StatusCode.FAILED_PRECONDITION not in Lease._TRANSIENT_GRPC_CODES + + def test_transient_unknown_messages(self): + """Should contain the known tunnel teardown messages.""" + assert "watch channel closed" in Lease._TRANSIENT_UNKNOWN_MESSAGES From e854d59a8637cefb816893b946aea1156e28946f Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 28 May 2026 11:34:07 +0000 Subject: [PATCH 07/10] fix: address review feedback on retry logic readability and test coverage - Add inline comments explaining why transient-error and OSError paths return silently instead of raising (handle_async runs inside TemporaryUnixListener.serve's task group) - Add test_exponential_backoff_delay_values to verify sleep delays follow the expected pattern (0.3, 0.6, 1.2, 2.4, 4.8, 5.0) - Add type annotation for channel_ready_timeout parameter in connect_router_stream Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter/jumpstarter/client/lease.py | 4 ++ .../jumpstarter/client/lease_test.py | 49 +++++++++++++++++++ .../jumpstarter/jumpstarter/common/streams.py | 2 +- 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index 000376876..606d1aca2 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -390,6 +390,9 @@ async def handle_async(self, stream): self.dial_timeout, e.details(), ) + # Return instead of raising: handle_async runs inside + # TemporaryUnixListener.serve's task group, so an + # unhandled exception would crash the listener. return delay = min(base_delay * (2**attempt), max_delay, remaining) logger.info( @@ -429,6 +432,7 @@ async def handle_async(self, stream): attempt += 1 continue logger.warning("Connection failed: %s", e) + # Return instead of raising: see transient-error comment above. return @asynccontextmanager diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index cf3fbd266..12d93788e 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -873,6 +873,55 @@ async def fail_router(*args, **kwargs): # Only the initial Dial, no retry assert lease.controller.Dial.call_count == 1 + @pytest.mark.anyio + async def test_exponential_backoff_delay_values(self): + """Verify that sleep delays follow exponential backoff: 0.3, 0.6, 1.2, 2.4, 4.8, capped at 5.0.""" + lease = _make_lease_for_handle() + lease.dial_timeout = 60.0 # large timeout so remaining doesn't cap delays + + # Fail 6 times then succeed on the 7th attempt + total_failures = 6 + call_count = 0 + dial_response = Mock(router_endpoint="ep", router_token="tok") + + async def dial_side_effect(req): + nonlocal call_count + call_count += 1 + if call_count <= total_failures: + raise _make_aio_rpc_error( + grpc.StatusCode.UNAVAILABLE, "tunnel dropped" + ) + return dial_response + + lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock) as mock_sleep: + with patch("jumpstarter.client.lease.connect_router_stream") as mock_router: + + @asynccontextmanager + async def fake_router(*args, **kwargs): + yield + + mock_router.side_effect = fake_router + await lease.handle_async(Mock()) + + assert call_count == total_failures + 1 + + # Verify exponential backoff: base_delay=0.3, max_delay=5.0 + # attempt 0: 0.3 * 2^0 = 0.3 + # attempt 1: 0.3 * 2^1 = 0.6 + # attempt 2: 0.3 * 2^2 = 1.2 + # attempt 3: 0.3 * 2^3 = 2.4 + # attempt 4: 0.3 * 2^4 = 4.8 + # attempt 5: min(0.3 * 2^5, 5.0) = min(9.6, 5.0) = 5.0 + expected_delays = [0.3, 0.6, 1.2, 2.4, 4.8, 5.0] + actual_delays = [call.args[0] for call in mock_sleep.call_args_list] + assert len(actual_delays) == len(expected_delays) + for actual, expected in zip(actual_delays, expected_delays): + assert actual == pytest.approx(expected), ( + f"Expected delay {expected}, got {actual}" + ) + class TestTransientGrpcCodes: """Tests for the _TRANSIENT_GRPC_CODES class attribute.""" diff --git a/python/packages/jumpstarter/jumpstarter/common/streams.py b/python/packages/jumpstarter/jumpstarter/common/streams.py index c84345853..19c0280ca 100644 --- a/python/packages/jumpstarter/jumpstarter/common/streams.py +++ b/python/packages/jumpstarter/jumpstarter/common/streams.py @@ -35,7 +35,7 @@ class StreamRequestMetadata(BaseModel): @asynccontextmanager -async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options, channel_ready_timeout=10): +async def connect_router_stream(endpoint, token, stream, tls_config, grpc_options, channel_ready_timeout: float = 10): credentials = grpc.composite_channel_credentials( await ssl_channel_credentials(endpoint, tls_config), grpc.access_token_call_credentials(token), From 781e26786cc03195c77e04cf1477c80a115e8421 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 28 May 2026 12:32:29 +0000 Subject: [PATCH 08/10] fix: add strict=True to zip() call to satisfy ruff B905 lint rule Co-Authored-By: Claude Opus 4.6 --- python/packages/jumpstarter/jumpstarter/client/lease_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index 12d93788e..76ef46a90 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -917,7 +917,7 @@ async def fake_router(*args, **kwargs): expected_delays = [0.3, 0.6, 1.2, 2.4, 4.8, 5.0] actual_delays = [call.args[0] for call in mock_sleep.call_args_list] assert len(actual_delays) == len(expected_delays) - for actual, expected in zip(actual_delays, expected_delays): + for actual, expected in zip(actual_delays, expected_delays, strict=True): assert actual == pytest.approx(expected), ( f"Expected delay {expected}, got {actual}" ) From 452087497b8615d46fc4d431ee5c962ee38e3997 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Fri, 29 May 2026 08:36:00 +0000 Subject: [PATCH 09/10] fix: address review feedback on retry logic - Bound channel_ready_timeout by remaining dial_timeout deadline - Change FAILED_PRECONDITION timeout from raise to return for consistency - Extract _retry_delay() to deduplicate backoff computation - Add type annotations to _dial_and_connect and handle_async - Trim restating docstrings and comments - Add tests for FAILED_PRECONDITION retry/timeout, PERMISSION_DENIED flag, and _retry_delay helper Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter/jumpstarter/client/lease.py | 64 +++++++-------- .../jumpstarter/client/lease_test.py | 81 +++++++++++++++++++ 2 files changed, 113 insertions(+), 32 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index 606d1aca2..25a9bad4e 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -22,6 +22,7 @@ fail_after, sleep, ) +from anyio.abc import SocketStream from anyio.from_thread import BlockingPortal from grpc.aio import AioRpcError, Channel from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc @@ -312,7 +313,8 @@ def __contextmanager__(self) -> Generator[Self]: with self.portal.wrap_async_context_manager(self) as value: yield value - # gRPC status codes that indicate transient network failures worth retrying. + # DEADLINE_EXCEEDED and CANCELLED are excluded: they indicate client-side + # timeout or cancellation, not server/network transients worth retrying. _TRANSIENT_GRPC_CODES = frozenset({ grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.RESOURCE_EXHAUSTED, @@ -326,46 +328,53 @@ def __contextmanager__(self) -> Generator[Self]: # known to occur during tunnel reconnection. _TRANSIENT_UNKNOWN_MESSAGES = ("watch channel closed",) - async def _dial_and_connect(self, stream): - """Dial the controller and connect to the router stream. + @staticmethod + def _retry_delay(attempt: int, remaining: float, base: float = 0.3, cap: float = 5.0) -> float: + """Compute exponential-backoff delay, capped by *cap* and *remaining* time.""" + return min(base * (2**attempt), cap, remaining) - Performs a single Dial + router connection attempt. Raises on failure - so the caller can decide whether to retry. - """ + async def _dial_and_connect( + self, stream: SocketStream, channel_ready_timeout: float = 10.0 + ) -> None: + """Single attempt; raises on failure for caller-driven retry.""" response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) async with connect_router_stream( - response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options + response.router_endpoint, + response.router_token, + stream, + self.tls_config, + self.grpc_options, + channel_ready_timeout=channel_ready_timeout, ): pass - async def handle_async(self, stream): + async def handle_async(self, stream: SocketStream) -> None: logger.debug("Connecting to Lease with name %s", self.name) - # Retry Dial + router connection with exponential backoff for transient - # errors. This handles: - # 1. The race condition where the client acquires a lease before the - # exporter has transitioned to LEASE_READY status (FAILED_PRECONDITION). - # 2. Transient network failures where the tunnel to the router drops and - # needs to be re-established (UNAVAILABLE, etc.). - # Uses time-based retry bounded by dial_timeout instead of fixed retry count. - base_delay = 0.3 - max_delay = 5.0 + # Retry Dial + router connection with exponential backoff. + # Handles FAILED_PRECONDITION (exporter not yet ready), transient + # network errors (tunnel drops), and OSError (unreachable endpoint). + # All error paths return instead of raising because handle_async runs + # inside TemporaryUnixListener.serve's task group -- an unhandled + # exception would crash the listener and terminate sibling connections. deadline = time.monotonic() + self.dial_timeout attempt = 0 while True: + remaining = deadline - time.monotonic() + channel_ready_timeout = max(min(10.0, remaining), 0.5) try: - await self._dial_and_connect(stream) + await self._dial_and_connect(stream, channel_ready_timeout=channel_ready_timeout) return except AioRpcError as e: remaining = deadline - time.monotonic() if e.code() == grpc.StatusCode.FAILED_PRECONDITION and "not ready" in str(e.details()): if remaining <= 0: - logger.debug( + logger.warning( "Exporter not ready and dial timeout (%.1fs) exceeded after %d attempts", self.dial_timeout, attempt + 1, ) - raise - delay = min(base_delay * (2**attempt), max_delay, remaining) + return + delay = self._retry_delay(attempt, remaining) logger.debug( "Exporter not ready, retrying in %.1fs (attempt %d, %.1fs remaining)", delay, @@ -375,9 +384,6 @@ async def handle_async(self, stream): await sleep(delay) attempt += 1 continue - # Retry on transient network errors (e.g. tunnel to router dropped). - # Also retry UNKNOWN when the message matches a known transient - # tunnel teardown (e.g. "watch channel closed"). is_transient = e.code() in self._TRANSIENT_GRPC_CODES or ( e.code() == grpc.StatusCode.UNKNOWN and any(msg in str(e.details()).lower() for msg in self._TRANSIENT_UNKNOWN_MESSAGES) @@ -390,11 +396,8 @@ async def handle_async(self, stream): self.dial_timeout, e.details(), ) - # Return instead of raising: handle_async runs inside - # TemporaryUnixListener.serve's task group, so an - # unhandled exception would crash the listener. return - delay = min(base_delay * (2**attempt), max_delay, remaining) + delay = self._retry_delay(attempt, remaining) logger.info( "Connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", e.code().name, @@ -406,7 +409,6 @@ async def handle_async(self, stream): await sleep(delay) attempt += 1 continue - # Exporter went offline or lease ended - log and exit gracefully if "permission denied" in str(e.details()).lower(): self.lease_transferred = True logger.warning( @@ -417,10 +419,9 @@ async def handle_async(self, stream): logger.warning("Connection to exporter lost: %s", e.details()) return except OSError as e: - # OSError can occur when the router endpoint is unreachable remaining = deadline - time.monotonic() if remaining > 0: - delay = min(base_delay * (2**attempt), max_delay, remaining) + delay = self._retry_delay(attempt, remaining) logger.info( "Connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s", delay, @@ -432,7 +433,6 @@ async def handle_async(self, stream): attempt += 1 continue logger.warning("Connection failed: %s", e) - # Return instead of raising: see transient-error comment above. return @asynccontextmanager diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index 76ef46a90..42eb24038 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -923,6 +923,87 @@ async def fake_router(*args, **kwargs): ) + @pytest.mark.anyio + async def test_failed_precondition_not_ready_retries_then_succeeds(self): + """FAILED_PRECONDITION 'not ready' should retry and succeed on next attempt.""" + lease = _make_lease_for_handle() + dial_response = Mock(router_endpoint="ep", router_token="tok") + call_count = 0 + + async def dial_side_effect(req): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise _make_aio_rpc_error( + grpc.StatusCode.FAILED_PRECONDITION, "exporter not ready" + ) + return dial_response + + lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + with patch("jumpstarter.client.lease.connect_router_stream") as mock_router: + + @asynccontextmanager + async def fake_router(*args, **kwargs): + yield + + mock_router.side_effect = fake_router + await lease.handle_async(Mock()) + + assert call_count == 2 + + @pytest.mark.anyio + async def test_failed_precondition_returns_after_timeout(self): + """FAILED_PRECONDITION should return (not raise) when dial_timeout is exceeded.""" + lease = _make_lease_for_handle() + lease.dial_timeout = 0.0 # already expired + + lease.controller.Dial = AsyncMock( + side_effect=_make_aio_rpc_error( + grpc.StatusCode.FAILED_PRECONDITION, "exporter not ready" + ), + ) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + # Should return without raising + await lease.handle_async(Mock()) + + lease.controller.Dial.assert_called_once() + + @pytest.mark.anyio + async def test_permission_denied_sets_lease_transferred(self): + """PERMISSION_DENIED should set lease_transferred = True.""" + lease = _make_lease_for_handle() + assert lease.lease_transferred is False + + lease.controller.Dial = AsyncMock( + side_effect=_make_aio_rpc_error( + grpc.StatusCode.PERMISSION_DENIED, "permission denied" + ), + ) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + await lease.handle_async(Mock()) + + assert lease.lease_transferred is True + + +class TestRetryDelay: + """Tests for the _retry_delay static method.""" + + def test_basic_exponential(self): + assert Lease._retry_delay(0, 60.0) == pytest.approx(0.3) + assert Lease._retry_delay(1, 60.0) == pytest.approx(0.6) + assert Lease._retry_delay(2, 60.0) == pytest.approx(1.2) + + def test_capped_by_max(self): + assert Lease._retry_delay(10, 60.0) == pytest.approx(5.0) + + def test_capped_by_remaining(self): + assert Lease._retry_delay(0, 0.1) == pytest.approx(0.1) + + class TestTransientGrpcCodes: """Tests for the _TRANSIENT_GRPC_CODES class attribute.""" From ff56b62ab651ea790ed86d687e5fd69ec55b5f4e Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Mon, 1 Jun 2026 12:35:56 +0000 Subject: [PATCH 10/10] fix: use status code for PERMISSION_DENIED, remove timeout floor, add tests - Check e.code() == PERMISSION_DENIED instead of fragile string matching on e.details(), preventing false positives/negatives - Remove 0.5s floor on channel_ready_timeout to prevent overshooting dial_timeout when remaining time is small - Add test verifying channel_ready_timeout is bounded by remaining deadline - Add tests for PERMISSION_DENIED with custom details and UNAUTHENTICATED with "permission denied" text Co-Authored-By: Claude Opus 4.6 --- .../jumpstarter/jumpstarter/client/lease.py | 22 ++--- .../jumpstarter/client/lease_test.py | 92 ++++++++++++++----- 2 files changed, 82 insertions(+), 32 deletions(-) diff --git a/python/packages/jumpstarter/jumpstarter/client/lease.py b/python/packages/jumpstarter/jumpstarter/client/lease.py index 25a9bad4e..839f08b80 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease.py @@ -315,12 +315,14 @@ def __contextmanager__(self) -> Generator[Self]: # DEADLINE_EXCEEDED and CANCELLED are excluded: they indicate client-side # timeout or cancellation, not server/network transients worth retrying. - _TRANSIENT_GRPC_CODES = frozenset({ - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.RESOURCE_EXHAUSTED, - grpc.StatusCode.ABORTED, - grpc.StatusCode.INTERNAL, - }) + _TRANSIENT_GRPC_CODES = frozenset( + { + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.ABORTED, + grpc.StatusCode.INTERNAL, + } + ) # UNKNOWN error messages that indicate transient tunnel teardowns. # We don't blanket-retry all UNKNOWN errors (they could be permanent @@ -333,9 +335,7 @@ def _retry_delay(attempt: int, remaining: float, base: float = 0.3, cap: float = """Compute exponential-backoff delay, capped by *cap* and *remaining* time.""" return min(base * (2**attempt), cap, remaining) - async def _dial_and_connect( - self, stream: SocketStream, channel_ready_timeout: float = 10.0 - ) -> None: + async def _dial_and_connect(self, stream: SocketStream, channel_ready_timeout: float = 10.0) -> None: """Single attempt; raises on failure for caller-driven retry.""" response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) async with connect_router_stream( @@ -360,7 +360,7 @@ async def handle_async(self, stream: SocketStream) -> None: attempt = 0 while True: remaining = deadline - time.monotonic() - channel_ready_timeout = max(min(10.0, remaining), 0.5) + channel_ready_timeout = max(min(10.0, remaining), 0) try: await self._dial_and_connect(stream, channel_ready_timeout=channel_ready_timeout) return @@ -409,7 +409,7 @@ async def handle_async(self, stream: SocketStream) -> None: await sleep(delay) attempt += 1 continue - if "permission denied" in str(e.details()).lower(): + if e.code() == grpc.StatusCode.PERMISSION_DENIED: self.lease_transferred = True logger.warning( "Lease %s has been transferred to another client. Your session is no longer valid.", diff --git a/python/packages/jumpstarter/jumpstarter/client/lease_test.py b/python/packages/jumpstarter/jumpstarter/client/lease_test.py index 42eb24038..52a5941c9 100644 --- a/python/packages/jumpstarter/jumpstarter/client/lease_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/lease_test.py @@ -692,9 +692,7 @@ async def dial_side_effect(req): nonlocal call_count call_count += 1 if call_count == 1: - raise _make_aio_rpc_error( - grpc.StatusCode.UNKNOWN, "watch channel closed" - ) + raise _make_aio_rpc_error(grpc.StatusCode.UNKNOWN, "watch channel closed") return dial_response lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) @@ -717,9 +715,7 @@ async def test_unknown_without_known_message_not_retried(self): lease = _make_lease_for_handle() lease.controller.Dial = AsyncMock( - side_effect=_make_aio_rpc_error( - grpc.StatusCode.UNKNOWN, "some unexpected server bug" - ), + side_effect=_make_aio_rpc_error(grpc.StatusCode.UNKNOWN, "some unexpected server bug"), ) with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): @@ -888,9 +884,7 @@ async def dial_side_effect(req): nonlocal call_count call_count += 1 if call_count <= total_failures: - raise _make_aio_rpc_error( - grpc.StatusCode.UNAVAILABLE, "tunnel dropped" - ) + raise _make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE, "tunnel dropped") return dial_response lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) @@ -918,10 +912,7 @@ async def fake_router(*args, **kwargs): actual_delays = [call.args[0] for call in mock_sleep.call_args_list] assert len(actual_delays) == len(expected_delays) for actual, expected in zip(actual_delays, expected_delays, strict=True): - assert actual == pytest.approx(expected), ( - f"Expected delay {expected}, got {actual}" - ) - + assert actual == pytest.approx(expected), f"Expected delay {expected}, got {actual}" @pytest.mark.anyio async def test_failed_precondition_not_ready_retries_then_succeeds(self): @@ -934,9 +925,7 @@ async def dial_side_effect(req): nonlocal call_count call_count += 1 if call_count == 1: - raise _make_aio_rpc_error( - grpc.StatusCode.FAILED_PRECONDITION, "exporter not ready" - ) + raise _make_aio_rpc_error(grpc.StatusCode.FAILED_PRECONDITION, "exporter not ready") return dial_response lease.controller.Dial = AsyncMock(side_effect=dial_side_effect) @@ -960,9 +949,7 @@ async def test_failed_precondition_returns_after_timeout(self): lease.dial_timeout = 0.0 # already expired lease.controller.Dial = AsyncMock( - side_effect=_make_aio_rpc_error( - grpc.StatusCode.FAILED_PRECONDITION, "exporter not ready" - ), + side_effect=_make_aio_rpc_error(grpc.StatusCode.FAILED_PRECONDITION, "exporter not ready"), ) with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): @@ -977,16 +964,79 @@ async def test_permission_denied_sets_lease_transferred(self): lease = _make_lease_for_handle() assert lease.lease_transferred is False + lease.controller.Dial = AsyncMock( + side_effect=_make_aio_rpc_error(grpc.StatusCode.PERMISSION_DENIED, "permission denied"), + ) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + await lease.handle_async(Mock()) + + assert lease.lease_transferred is True + + @pytest.mark.anyio + async def test_permission_denied_with_custom_details_still_detected(self): + """PERMISSION_DENIED with non-standard detail text should still set lease_transferred.""" + lease = _make_lease_for_handle() + assert lease.lease_transferred is False + + lease.controller.Dial = AsyncMock( + side_effect=_make_aio_rpc_error(grpc.StatusCode.PERMISSION_DENIED, "lease reassigned to another client"), + ) + + with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): + await lease.handle_async(Mock()) + + assert lease.lease_transferred is True + + @pytest.mark.anyio + async def test_unauthenticated_with_permission_text_does_not_set_transferred(self): + """UNAUTHENTICATED with 'permission denied' in details should NOT set lease_transferred.""" + lease = _make_lease_for_handle() + assert lease.lease_transferred is False + lease.controller.Dial = AsyncMock( side_effect=_make_aio_rpc_error( - grpc.StatusCode.PERMISSION_DENIED, "permission denied" + grpc.StatusCode.UNAUTHENTICATED, + "permission denied: token expired", ), ) with patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock): await lease.handle_async(Mock()) - assert lease.lease_transferred is True + assert lease.lease_transferred is False + + @pytest.mark.anyio + async def test_channel_ready_timeout_bounded_by_remaining(self): + """channel_ready_timeout should decrease as the dial deadline approaches.""" + lease = _make_lease_for_handle() + lease.dial_timeout = 3.0 + + call_count = 0 + captured_timeouts = [] + + async def tracking_dial_and_connect(self_inner, stream, channel_ready_timeout=10.0): + nonlocal call_count + call_count += 1 + captured_timeouts.append(channel_ready_timeout) + if call_count <= 3: + raise _make_aio_rpc_error(grpc.StatusCode.FAILED_PRECONDITION, "exporter not ready") + # Succeed on 4th attempt (won't normally reach here with 3s timeout) + + with ( + patch.object(type(lease), "_dial_and_connect", tracking_dial_and_connect), + patch("jumpstarter.client.lease.sleep", new_callable=AsyncMock), + ): + await lease.handle_async(Mock()) + + # With a 3s dial_timeout, the first call should have channel_ready_timeout <= 3.0 + # and subsequent calls should have progressively smaller values + assert len(captured_timeouts) >= 2 + assert all(t <= 10.0 for t in captured_timeouts), f"All timeouts should be <= 10.0, got {captured_timeouts}" + # The first timeout should be bounded by remaining (~3.0), not the default 10.0 + assert captured_timeouts[0] <= 3.1, ( + f"First timeout should be bounded by dial_timeout (~3.0), got {captured_timeouts[0]}" + ) class TestRetryDelay: