feat: Add RabbitMQ queue backend#798
Conversation
Add a fourth job-queue backend selectable via QUEUE_BACKEND=rabbitmq, plugging into the existing QueueBackend trait. Each of the 8 queue types maps to one durable classic queue published via the default exchange, with publisher-confirmed durable publishes and lapin connection auto-recovery. Deferred jobs and retry backoff use the Redis store-and-run-when-due scheduler, extracted from the Pub/Sub backend into shared queues/schedule.rs and queues/worker_shared.rs (Pub/Sub keys kept byte-identical); broker queues only ever carry already-due jobs. - Idempotent declare with a passive verify-only mode (RABBITMQ_PASSIVE_QUEUES) - Per-queue prefetch-bounded consumers, no lease management, 600s handler timeout - Settle-retry ordering (ZADD before ack); health/depth via passive declare - RABBITMQ_URL redacted in all logs/errors - lapin =4.10.0 (exact-pinned + supply-chain triaged) - Local example under examples/rabbitmq-queue-storage/ (real broker, one command) Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Address review findings on the unmerged RabbitMQ queue backend. Parse RABBITMQ_URL via AMQPUri up front and connect with connect_uri. On parse failure return a static error, and build the redacted endpoint from the parsed fields so the embedded password can never reach a log or error (a missing-"//" typo previously leaked it). Configure unbounded reconnect backoff with a capped delay instead of lapin's default 16-attempt cap, so the backend keeps healing after a long broker outage rather than bricking permanently. Publish with mandatory=true and treat a returned (unroutable) message as an error. An unroutable publish was silently acked and the job lost, e.g. when a queue is deleted at runtime. Pause for the capped retry delay before nack-with-requeue when a retry cannot be recorded in Redis, avoiding a zero-backoff redelivery hot loop while Redis is down. Drain in-flight handlers while the consumer channel is still open on graceful shutdown so their acks land; closing the channel first requeued every in-flight delivery and duplicated each job. Also race permit acquisition against the shutdown signal. Log poisoned-acker no-ops (ack/nack returning Ok(false)) instead of treating them as success, and clamp prefetch 0 to 1 (0 means unlimited in AMQP). Validated against a real broker and Redis (gated tests including a new unroutable-publish regression) and a live relayer (redacted endpoint in logs, no credentials, /ready healthy). Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR introduces a complete RabbitMQ queue backend to the OpenZeppelin relayer alongside refactored shared scheduler and handler logic. The implementation decouples queue scheduling (Redis-backed deferred jobs) and handler execution (timeout/panic handling) from transport specifics, enabling consistent behavior across Pub/Sub and RabbitMQ backends. Full Docker Compose example and configuration documentation are provided. ChangesRabbitMQ Backend & Shared Queue Infrastructure
Documentation & Examples
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## main #798 +/- ##
==========================================
- Coverage 89.65% 89.11% -0.54%
==========================================
Files 299 305 +6
Lines 126736 129079 +2343
==========================================
+ Hits 113619 115028 +1409
- Misses 13117 14051 +934
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@examples/rabbitmq-queue-storage/README.md`:
- Around line 166-167: The markdown link target
'../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md' is
invalid; locate the actual repository location of the file named
contracts/config-and-queues.md (or the equivalent spec) and update the README
link to the correct relative path to that file (or replace it with the proper
docs URL), ensuring the link resolves; verify by opening the updated link or
running the repo link checker.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: acbe2bf2-6172-4629-954c-7f1eaa3b53f9
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (19)
Cargo.tomlREADME.mddocs/configuration/index.mdxexamples/rabbitmq-queue-storage/.env.exampleexamples/rabbitmq-queue-storage/README.mdexamples/rabbitmq-queue-storage/config/config.jsonexamples/rabbitmq-queue-storage/docker-compose.yamlexamples/rabbitmq-queue-storage/rabbitmq.confsrc/config/server_config.rssrc/queues/cron.rssrc/queues/mod.rssrc/queues/pubsub/backend.rssrc/queues/pubsub/schedule.rssrc/queues/pubsub/worker.rssrc/queues/rabbitmq/backend.rssrc/queues/rabbitmq/mod.rssrc/queues/rabbitmq/worker.rssrc/queues/schedule.rssrc/queues/worker_shared.rs
| Full operator reference: [`contracts/config-and-queues.md`](../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md) | ||
| and the [configuration docs](../../docs/configuration/index.mdx). |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check if the referenced spec file exists
# From examples/rabbitmq-queue-storage/README.md, the relative path should resolve to:
fd -t f 'config-and-queues.md' specs/Repository: OpenZeppelin/openzeppelin-relayer
Length of output: 171
Fix operator reference link in RabbitMQ queue-storage README
The link ../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md (examples/rabbitmq-queue-storage/README.md lines 166-167) points to a specs/ directory at the repo root, but specs/ does not exist there—so the relative path can’t resolve. Update the link to the correct existing file location (or add/move the missing spec file).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@examples/rabbitmq-queue-storage/README.md` around lines 166 - 167, The
markdown link target
'../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md' is
invalid; locate the actual repository location of the file named
contracts/config-and-queues.md (or the equivalent spec) and update the README
link to the correct relative path to that file (or replace it with the proper
docs URL), ensuring the link resolves; verify by opening the updated link or
running the repo link checker.
Comments only; no behavior change. - Drop references to internal spec/planning artifacts (constitution, functional-requirement and task IDs, review-finding labels) that are not committed to the repo and so dangle for an outside reader. Keep the reasoning, drop the unresolvable pointer. Also covers the pre-existing references on the Pub/Sub dependency pins. - Remove decorative section-divider comments. - Fix two inaccurate comments: the `redelivered` flag is set on any redelivery (not only channel loss), and lapin's rustls feature already selects aws-lc-rs (so default-features=false does not avoid a second crypto provider). Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Add broker-free unit tests for the RabbitMQ backend's pure logic: - Extract the publisher-confirm result mapping into classify_publish_confirmation and test all four Confirmation branches (routed ack, unroutable return, nack, not-requested). - Test amqp_value_as_usize across every AMQP integer width plus negative and non-integer inputs. - Test the ack/nack helpers over both acker states (success and the poisoned/used no-op) via mock deliveries. - Test the settle-retry drop paths (budget exhausted, non-UTF8 body) that ack and return before any Redis write. Lifts unit coverage of backend.rs 53->61% and worker.rs 2->18%. The remaining uncovered code is broker/Redis I/O exercised by the gated integration tests. Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Add unit tests for two previously-uncovered infra-free paths: - declare_failure_reason: verify a real lapin protocol error's AMQP reply code is extracted via get_id() and routed to the actionable message, and that a non-protocol error keeps the lapin detail. - publish_scheduled: verify the success path hands the job to the publish callback verbatim, and that the publish+requeue double-failure path resolves without panicking. Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Summary
Adds a RabbitMQ job-queue backend, selectable with
QUEUE_BACKEND=rabbitmq— the 4th backend afterredis,sqs, andpubsub. It is built as a deliberate sibling of the Pub/Sub backend: the same store-and-run-when-due Redis scheduling and settle-retry (at-least-once) contract, minus two of Pub/Sub's hardest parts — no lease/ack-deadline management (an unacked RabbitMQ delivery stays reserved while the consumer channel lives) and no external depth read (a passivequeue_declarereturns counts over the live connection).What's included
lapin(exact-pinned=4.10.0,rustls+aws-lc-rs): a dedicated confirm-mode producer channel, declare-or-verify startup for all 8 durable queues (collecting every 404/406/403 setup failure before failing fast), per-queue prefetch-bounded consumers with manual acks, publisher confirms, connection auto-recovery, and a passive-declare health probe feeding thequeue_depthgauge.src/queues/schedule.rs(the Redis sorted-set scheduler, parameterized by a backend "segment") andsrc/queues/worker_shared.rs(dispatch table, retry/backoff helpers, the 600s-timeout + panic-guard wrapper). Pub/Sub'sschedule.rs/worker.rsnow thin-wrap these — its scheduled-set keys are kept byte-identical and the RabbitMQ segment is disjoint.RABBITMQ_URL,RABBITMQ_QUEUE_PREFIX,RABBITMQ_PASSIVE_QUEUESgetters and enum/dispatch wiring; a runnableexamples/rabbitmq-queue-storage/compose stack (rabbitmq:4-management+ redis); README and configuration docs.Reliability & security hardening
Folded in from an implementation review:
RABBITMQ_URLis parsed up front and only a redacted endpoint built from the parsed fields is ever logged. On a parse failure a static error is returned (the raw URL is never echoed), closing a leak where a missing-//typo previously survived "redaction" with the password intact.mandatory; an unroutable message (e.g. a queue deleted at runtime) is surfaced as an error rather than acked-and-discarded.Testing Process
cargo fmt --check, the CI clippy gate, and the full queues unit suite (277 tests) — all green.pending → confirmed; a broker restart proved self-healing (relayer reconnected, a post-restart tx confirmed); credential redaction was verified in real logs (noguest:guest);/api/v1/readyreported healthy.Checklist
Summary by CodeRabbit
Release Notes
New Features
Documentation