Skip to content
112 changes: 93 additions & 19 deletions python/packages/jumpstarter/jumpstarter/client/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
fail_after,
sleep,
)
from anyio.abc import SocketStream
from anyio.from_thread import BlockingPortal
from grpc.aio import AioRpcError, Channel
from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc
Expand Down Expand Up @@ -312,41 +313,102 @@ def __contextmanager__(self) -> Generator[Self]:
with self.portal.wrap_async_context_manager(self) as value:
yield value

async def handle_async(self, stream):
# DEADLINE_EXCEEDED and CANCELLED are excluded: they indicate client-side
# timeout or cancellation, not server/network transients worth retrying.
_TRANSIENT_GRPC_CODES = frozenset({
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.RESOURCE_EXHAUSTED,
grpc.StatusCode.ABORTED,
grpc.StatusCode.INTERNAL,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about StatusCode.UNKNOWN?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! UNKNOWN is a bit of a gray area for retries. In gRPC, UNKNOWN is returned when:

  1. The server raises an exception without an explicit status code (often a bug/unhandled exception)
  2. An RPC receives a status it doesn't understand
  3. The server crashes mid-response

Unlike UNAVAILABLE or RESOURCE_EXHAUSTED which are clearly transient network/load conditions, UNKNOWN can signal a persistent server-side bug that retrying won't fix. Including it risks masking real errors by silently retrying indefinitely until the timeout expires.

That said, there are scenarios where a transient server crash could surface as UNKNOWN, and in those cases a retry would help. I'd lean toward adding it with a comment noting the trade-off, since the retry is bounded by dial_timeout anyway. If you feel it's worth including, I'll add it.

What's your preference?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically with "watch channel closed" error

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point -- the "watch channel closed" error surfacing as UNKNOWN is clearly a transient tunnel teardown, not a permanent server bug. I'll add StatusCode.UNKNOWN to _TRANSIENT_GRPC_CODES with a comment noting this specific scenario.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but we can't do this for any UNKNOWN status

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed -- blindly retrying every UNKNOWN error is too broad and could mask real bugs. I'll remove UNKNOWN from the blanket _TRANSIENT_GRPC_CODES set and instead add a targeted check that only retries UNKNOWN when the error details contain the specific "watch channel closed" message. That way we handle the tunnel teardown case you flagged without swallowing unrelated errors.

})

# UNKNOWN error messages that indicate transient tunnel teardowns.
# We don't blanket-retry all UNKNOWN errors (they could be permanent
# server bugs), but specific messages like "watch channel closed" are
# known to occur during tunnel reconnection.
_TRANSIENT_UNKNOWN_MESSAGES = ("watch channel closed",)

@staticmethod
def _retry_delay(attempt: int, remaining: float, base: float = 0.3, cap: float = 5.0) -> float:
"""Compute exponential-backoff delay, capped by *cap* and *remaining* time."""
return min(base * (2**attempt), cap, remaining)

async def _dial_and_connect(
self, stream: SocketStream, channel_ready_timeout: float = 10.0
) -> None:
"""Single attempt; raises on failure for caller-driven retry."""
response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name))
async with connect_router_stream(
response.router_endpoint,
response.router_token,
stream,
self.tls_config,
self.grpc_options,
channel_ready_timeout=channel_ready_timeout,
):
pass
Comment on lines +340 to +349
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] channel_ready_timeout is not bounded by the remaining dial_timeout deadline. _dial_and_connect calls connect_router_stream without passing channel_ready_timeout, so it defaults to 10s. The retry loop caps backoff sleep by remaining time, but channel.channel_ready() inside connect_router_stream is independent. When only 1-2s remain on dial_timeout, a single _dial_and_connect call can block for up to 10s, causing total wall-clock time to overshoot dial_timeout by up to 10s.

Suggestion: Pass remaining as an upper bound: await self._dial_and_connect(stream, channel_ready_timeout=max(min(10, remaining), 0.5)).

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will pass min(10, remaining) (with a 0.5s floor) as channel_ready_timeout to _dial_and_connect so the channel-ready wait is bounded by the overall deadline.


async def handle_async(self, stream: SocketStream) -> None:
logger.debug("Connecting to Lease with name %s", self.name)
# Retry Dial with exponential backoff for transient "exporter not ready" errors.
# This handles the race condition where the client acquires a lease before
# the exporter has transitioned to LEASE_READY status.
# Uses time-based retry bounded by dial_timeout instead of fixed retry count.
base_delay = 0.3
max_delay = 2.0
# Retry Dial + router connection with exponential backoff.
# Handles FAILED_PRECONDITION (exporter not yet ready), transient
# network errors (tunnel drops), and OSError (unreachable endpoint).
# All error paths return instead of raising because handle_async runs
# inside TemporaryUnixListener.serve's task group -- an unhandled
# exception would crash the listener and terminate sibling connections.
deadline = time.monotonic() + self.dial_timeout
attempt = 0
while True:
remaining = deadline - time.monotonic()
channel_ready_timeout = max(min(10.0, remaining), 0.5)
try:
response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name))
break
await self._dial_and_connect(stream, channel_ready_timeout=channel_ready_timeout)
return
except AioRpcError as e:
remaining = deadline - time.monotonic()
if e.code() == grpc.StatusCode.FAILED_PRECONDITION and "not ready" in str(e.details()):
remaining = deadline - time.monotonic()
if remaining <= 0:
logger.debug(
logger.warning(
"Exporter not ready and dial timeout (%.1fs) exceeded after %d attempts",
self.dial_timeout,
attempt + 1,
)
raise
delay = min(base_delay * (2**attempt), max_delay, remaining)
return
delay = self._retry_delay(attempt, remaining)
logger.debug(
"Exporter not ready, retrying Dial in %.1fs (attempt %d, %.1fs remaining)",
"Exporter not ready, retrying in %.1fs (attempt %d, %.1fs remaining)",
delay,
attempt + 1,
remaining,
)
await sleep(delay)
attempt += 1
continue
# Exporter went offline or lease ended - log and exit gracefully
is_transient = e.code() in self._TRANSIENT_GRPC_CODES or (
e.code() == grpc.StatusCode.UNKNOWN
and any(msg in str(e.details()).lower() for msg in self._TRANSIENT_UNKNOWN_MESSAGES)
)
if is_transient:
if remaining <= 0:
logger.warning(
"Connection failed with transient error after %d attempts (%.1fs elapsed): %s",
attempt + 1,
self.dial_timeout,
Comment thread
raballew marked this conversation as resolved.
e.details(),
)
return
delay = self._retry_delay(attempt, remaining)
logger.info(
"Connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s",
e.code().name,
delay,
attempt + 1,
remaining,
e.details(),
Comment on lines +402 to +407
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Three exception handlers repeat the same 4-line backoff pattern: delay = min(base_delay * (2**attempt), max_delay, remaining), log, await sleep(delay), attempt += 1, continue. The handle_async method is around 95 lines with deep nesting. Changing the backoff strategy (e.g., adding jitter) would require coordinated edits in three places.

Suggestion: Extract a helper like _compute_retry_delay(attempt, remaining) -> float to deduplicate the delay calculation while keeping the distinct log messages and timeout behaviors.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Will extract _retry_delay(attempt, remaining) to centralize the backoff computation. The distinct log messages and timeout behaviors will stay in the caller.

)
await sleep(delay)
attempt += 1
continue
if "permission denied" in str(e.details()).lower():
self.lease_transferred = True
logger.warning(
Expand All @@ -356,10 +418,22 @@ async def handle_async(self, stream):
else:
logger.warning("Connection to exporter lost: %s", e.details())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] No test verifies that PERMISSION_DENIED sets lease_transferred = True. When handle_async receives an AioRpcError with "permission denied" in the details, it sets self.lease_transferred = True. The surrounding control flow was restructured (errors now flow through _dial_and_connect and the new is_transient gate), making regression possible.

Suggestion: Add a test that raises AioRpcError(PERMISSION_DENIED, "permission denied") from _dial_and_connect and asserts lease.lease_transferred is True.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will add a test that raises AioRpcError(PERMISSION_DENIED, "permission denied") from _dial_and_connect and asserts lease.lease_transferred is True.

return
async with connect_router_stream(
response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options
):
pass
except OSError as e:
remaining = deadline - time.monotonic()
if remaining > 0:
delay = self._retry_delay(attempt, remaining)
logger.info(
"Connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s",
delay,
attempt + 1,
remaining,
e,
)
await sleep(delay)
attempt += 1
continue
logger.warning("Connection failed: %s", e)
return

@asynccontextmanager
async def serve_unix_async(self):
Expand Down
Loading
Loading