feat: replace TxPoller polling with SSE streaming#259
Open
Conversation
Member
Author
This stack of pull requests is managed by Graphite. Learn more about stacking. |
prestwich
requested changes
Apr 15, 2026
| // full_fetch below serves the same purpose the env arm would have. | ||
| _ = self.envs.changed() => {} | ||
| } | ||
| *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); |
Member
There was a problem hiding this comment.
i don't love putting the exponential backoff in-line instead of using an existing implementation, or having it be an unbounded number of attempts. at what point is a failure deemed permanent?
| counter!("signet.builder.cache.tx_poll_count").increment(1); | ||
| if let Ok(transactions) = self | ||
| .tx_cache | ||
| .stream_transactions() |
Member
There was a problem hiding this comment.
sdk API thing. we now have "stream transactions" and "subscribe", which are not clear about their behavior
| self.tx_cache.stream_transactions().try_collect().await | ||
| /// Fetches all transactions from the cache, forwarding each to nonce | ||
| /// checking before it reaches the cache task. | ||
| async fn full_fetch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) { |
Member
There was a problem hiding this comment.
architectural:
why was check_tx_cache deleted if its logic is repeated inline here?
This function also does more than fetch, it dispatches tasks. So its name should reflect that
Switch TxPoller from 1s timer-based polling to SSE streaming for real-time transaction delivery. The new lifecycle: 1. Full fetch of all transactions at startup 2. SSE stream for real-time new transaction delivery 3. Full refetch on each block environment change Adds exponential backoff (1s-30s) on SSE reconnection to prevent tight loops when the endpoint is unavailable. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Expand tokio import for nightly rustfmt, remove unresolved `CacheTask` rustdoc link. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Race the backoff sleep against envs.changed() so a block env change arriving during reconnect cuts the sleep short, instead of buffering up to 30s while the simulator operates on a stale cache. Also replace the nested let-else + unwrap_err in the SSE arm with a single match — no behavior change, drops the double-unwrap. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Addresses the obvious nits from prestwich's review on src/tasks/cache/tx.rs: - Move backoff constants to module level (was assoc consts) - Use crate::metrics::inc_*/record_* helpers instead of bare counter!/histogram! macros (matches #263's metrics module) - Rewrite subscribe() as a combinator chain over the Result - Bias the select! in reconnect() so env changes preempt the backoff sleep, with an inline rationale - Run full_fetch and subscribe concurrently via tokio::join! in reconnect() - Extract handle_sse_item helper to flatten the nested match inside task_future's SSE select arm Deferred to a follow-up (need design decisions): - Split full_fetch into fetch + dispatch with better name - Replace inline backoff with backon + permanence criterion - Rename SDK stream_transactions/subscribe_transactions Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
bba3625 to
1809ad1
Compare
6 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Summary
Replaces the 1s timer-based polling loop in
TxPollerwith SSE streaming for real-time transaction delivery from the tx-pool. The new task lifecycle:/transactions/feed) pushes new transactions as they arrive — no more redundant refetchesOn SSE disconnect or error, the poller reconnects with exponential backoff (1s initial, doubling up to 30s cap) and does a full refetch to cover the gap. Backoff resets on each successfully received transaction.
Changes
Cargo.toml: enablessefeature oninit4-bin-base(transitively enablessignet-tx-cache/sse)src/tasks/cache/tx.rs: rewriteTxPoller— replace poll loop withfull_fetch()+subscribe()+select!over SSE items and block env changes. Addreconnect()with exponential backoff. Removepoll_interval_ms,poll_duration(),Defaultimpl.src/tasks/cache/system.rs: passblock_envwatch receiver toTxPoller::new()tests/tx_poller_test.rs: update integration test to useTxCachedirectly (no morecheck_tx_cache()method)BundlePolleris unchanged — the/bundles/feedserver endpoint is not yet available.Test plan
make clippypasses cleanmake test— all 8 unit tests pass, integration tests correctly ignored🤖 Generated with Claude Code