-
Notifications
You must be signed in to change notification settings - Fork 28
fix: add retry logic for tunnel reconnection in jmp shell proxy #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8ec198d
21e1ad3
4b8e7b4
cff7bf0
3b31ac1
47da2b8
661fa73
6c9f1db
f906fa2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| fail_after, | ||
| sleep, | ||
| ) | ||
| from anyio.abc import SocketStream | ||
| from anyio.from_thread import BlockingPortal | ||
| from grpc.aio import AioRpcError, Channel | ||
| from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc | ||
|
|
@@ -312,41 +313,102 @@ def __contextmanager__(self) -> Generator[Self]: | |
| with self.portal.wrap_async_context_manager(self) as value: | ||
| yield value | ||
|
|
||
| async def handle_async(self, stream): | ||
| # DEADLINE_EXCEEDED and CANCELLED are excluded: they indicate client-side | ||
| # timeout or cancellation, not server/network transients worth retrying. | ||
| _TRANSIENT_GRPC_CODES = frozenset({ | ||
| grpc.StatusCode.UNAVAILABLE, | ||
| grpc.StatusCode.RESOURCE_EXHAUSTED, | ||
| grpc.StatusCode.ABORTED, | ||
| grpc.StatusCode.INTERNAL, | ||
| }) | ||
|
|
||
| # UNKNOWN error messages that indicate transient tunnel teardowns. | ||
| # We don't blanket-retry all UNKNOWN errors (they could be permanent | ||
| # server bugs), but specific messages like "watch channel closed" are | ||
| # known to occur during tunnel reconnection. | ||
| _TRANSIENT_UNKNOWN_MESSAGES = ("watch channel closed",) | ||
|
|
||
| @staticmethod | ||
| def _retry_delay(attempt: int, remaining: float, base: float = 0.3, cap: float = 5.0) -> float: | ||
| """Compute exponential-backoff delay, capped by *cap* and *remaining* time.""" | ||
| return min(base * (2**attempt), cap, remaining) | ||
|
|
||
| async def _dial_and_connect( | ||
| self, stream: SocketStream, channel_ready_timeout: float = 10.0 | ||
| ) -> None: | ||
| """Single attempt; raises on failure for caller-driven retry.""" | ||
| response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) | ||
| async with connect_router_stream( | ||
| response.router_endpoint, | ||
| response.router_token, | ||
| stream, | ||
| self.tls_config, | ||
| self.grpc_options, | ||
| channel_ready_timeout=channel_ready_timeout, | ||
| ): | ||
| pass | ||
|
Comment on lines
+340
to
+349
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [HIGH] Suggestion: Pass AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Will pass |
||
|
|
||
| async def handle_async(self, stream: SocketStream) -> None: | ||
| logger.debug("Connecting to Lease with name %s", self.name) | ||
| # Retry Dial with exponential backoff for transient "exporter not ready" errors. | ||
| # This handles the race condition where the client acquires a lease before | ||
| # the exporter has transitioned to LEASE_READY status. | ||
| # Uses time-based retry bounded by dial_timeout instead of fixed retry count. | ||
| base_delay = 0.3 | ||
| max_delay = 2.0 | ||
| # Retry Dial + router connection with exponential backoff. | ||
| # Handles FAILED_PRECONDITION (exporter not yet ready), transient | ||
| # network errors (tunnel drops), and OSError (unreachable endpoint). | ||
| # All error paths return instead of raising because handle_async runs | ||
| # inside TemporaryUnixListener.serve's task group -- an unhandled | ||
| # exception would crash the listener and terminate sibling connections. | ||
| deadline = time.monotonic() + self.dial_timeout | ||
| attempt = 0 | ||
| while True: | ||
| remaining = deadline - time.monotonic() | ||
| channel_ready_timeout = max(min(10.0, remaining), 0.5) | ||
| try: | ||
| response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) | ||
| break | ||
| await self._dial_and_connect(stream, channel_ready_timeout=channel_ready_timeout) | ||
| return | ||
| except AioRpcError as e: | ||
| remaining = deadline - time.monotonic() | ||
| if e.code() == grpc.StatusCode.FAILED_PRECONDITION and "not ready" in str(e.details()): | ||
| remaining = deadline - time.monotonic() | ||
| if remaining <= 0: | ||
| logger.debug( | ||
| logger.warning( | ||
| "Exporter not ready and dial timeout (%.1fs) exceeded after %d attempts", | ||
| self.dial_timeout, | ||
| attempt + 1, | ||
| ) | ||
| raise | ||
| delay = min(base_delay * (2**attempt), max_delay, remaining) | ||
| return | ||
| delay = self._retry_delay(attempt, remaining) | ||
| logger.debug( | ||
| "Exporter not ready, retrying Dial in %.1fs (attempt %d, %.1fs remaining)", | ||
| "Exporter not ready, retrying in %.1fs (attempt %d, %.1fs remaining)", | ||
| delay, | ||
| attempt + 1, | ||
| remaining, | ||
| ) | ||
| await sleep(delay) | ||
| attempt += 1 | ||
| continue | ||
| # Exporter went offline or lease ended - log and exit gracefully | ||
| is_transient = e.code() in self._TRANSIENT_GRPC_CODES or ( | ||
| e.code() == grpc.StatusCode.UNKNOWN | ||
| and any(msg in str(e.details()).lower() for msg in self._TRANSIENT_UNKNOWN_MESSAGES) | ||
| ) | ||
| if is_transient: | ||
| if remaining <= 0: | ||
| logger.warning( | ||
| "Connection failed with transient error after %d attempts (%.1fs elapsed): %s", | ||
| attempt + 1, | ||
| self.dial_timeout, | ||
|
raballew marked this conversation as resolved.
|
||
| e.details(), | ||
| ) | ||
| return | ||
| delay = self._retry_delay(attempt, remaining) | ||
| logger.info( | ||
| "Connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", | ||
| e.code().name, | ||
| delay, | ||
| attempt + 1, | ||
| remaining, | ||
| e.details(), | ||
|
Comment on lines
+402
to
+407
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] Three exception handlers repeat the same 4-line backoff pattern: Suggestion: Extract a helper like AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Will extract |
||
| ) | ||
| await sleep(delay) | ||
| attempt += 1 | ||
| continue | ||
| if "permission denied" in str(e.details()).lower(): | ||
| self.lease_transferred = True | ||
| logger.warning( | ||
|
|
@@ -356,10 +418,22 @@ async def handle_async(self, stream): | |
| else: | ||
| logger.warning("Connection to exporter lost: %s", e.details()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] No test verifies that PERMISSION_DENIED sets Suggestion: Add a test that raises AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Will add a test that raises |
||
| return | ||
| async with connect_router_stream( | ||
| response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options | ||
| ): | ||
| pass | ||
| except OSError as e: | ||
| remaining = deadline - time.monotonic() | ||
| if remaining > 0: | ||
| delay = self._retry_delay(attempt, remaining) | ||
| logger.info( | ||
| "Connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s", | ||
| delay, | ||
| attempt + 1, | ||
| remaining, | ||
| e, | ||
| ) | ||
| await sleep(delay) | ||
| attempt += 1 | ||
| continue | ||
| logger.warning("Connection failed: %s", e) | ||
| return | ||
|
|
||
| @asynccontextmanager | ||
| async def serve_unix_async(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about StatusCode.UNKNOWN?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question!
UNKNOWNis a bit of a gray area for retries. In gRPC,UNKNOWNis returned when:Unlike
UNAVAILABLEorRESOURCE_EXHAUSTEDwhich are clearly transient network/load conditions,UNKNOWNcan signal a persistent server-side bug that retrying won't fix. Including it risks masking real errors by silently retrying indefinitely until the timeout expires.That said, there are scenarios where a transient server crash could surface as
UNKNOWN, and in those cases a retry would help. I'd lean toward adding it with a comment noting the trade-off, since the retry is bounded bydial_timeoutanyway. If you feel it's worth including, I'll add it.What's your preference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically with "watch channel closed" error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point -- the "watch channel closed" error surfacing as
UNKNOWNis clearly a transient tunnel teardown, not a permanent server bug. I'll addStatusCode.UNKNOWNto_TRANSIENT_GRPC_CODESwith a comment noting this specific scenario.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, but we can't do this for any UNKNOWN status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed -- blindly retrying every
UNKNOWNerror is too broad and could mask real bugs. I'll removeUNKNOWNfrom the blanket_TRANSIENT_GRPC_CODESset and instead add a targeted check that only retriesUNKNOWNwhen the error details contain the specific "watch channel closed" message. That way we handle the tunnel teardown case you flagged without swallowing unrelated errors.