Skip to content

feat: Add RabbitMQ queue backend#798

Open
dylankilkenny wants to merge 5 commits into
mainfrom
003-rabbitmq-queue-backend
Open

feat: Add RabbitMQ queue backend#798
dylankilkenny wants to merge 5 commits into
mainfrom
003-rabbitmq-queue-backend

Conversation

@dylankilkenny

@dylankilkenny dylankilkenny commented Jun 11, 2026

Copy link
Copy Markdown
Member

Summary

Adds a RabbitMQ job-queue backend, selectable with QUEUE_BACKEND=rabbitmq — the 4th backend after redis, sqs, and pubsub. It is built as a deliberate sibling of the Pub/Sub backend: the same store-and-run-when-due Redis scheduling and settle-retry (at-least-once) contract, minus two of Pub/Sub's hardest parts — no lease/ack-deadline management (an unacked RabbitMQ delivery stays reserved while the consumer channel lives) and no external depth read (a passive queue_declare returns counts over the live connection).

What's included

  • AMQP 0-9-1 backend over lapin (exact-pinned =4.10.0, rustls + aws-lc-rs): a dedicated confirm-mode producer channel, declare-or-verify startup for all 8 durable queues (collecting every 404/406/403 setup failure before failing fast), per-queue prefetch-bounded consumers with manual acks, publisher confirms, connection auto-recovery, and a passive-declare health probe feeding the queue_depth gauge.
  • Shared extractions so RabbitMQ and Pub/Sub run one implementation rather than two copies: src/queues/schedule.rs (the Redis sorted-set scheduler, parameterized by a backend "segment") and src/queues/worker_shared.rs (dispatch table, retry/backoff helpers, the 600s-timeout + panic-guard wrapper). Pub/Sub's schedule.rs/worker.rs now thin-wrap these — its scheduled-set keys are kept byte-identical and the RabbitMQ segment is disjoint.
  • Config & wiring: RABBITMQ_URL, RABBITMQ_QUEUE_PREFIX, RABBITMQ_PASSIVE_QUEUES getters and enum/dispatch wiring; a runnable examples/rabbitmq-queue-storage/ compose stack (rabbitmq:4-management + redis); README and configuration docs.

Reliability & security hardening

Folded in from an implementation review:

  • Credential safety (Constitution I): RABBITMQ_URL is parsed up front and only a redacted endpoint built from the parsed fields is ever logged. On a parse failure a static error is returned (the raw URL is never echoed), closing a leak where a missing-// typo previously survived "redaction" with the password intact.
  • Self-healing across long outages: unbounded reconnect backoff with a capped delay, instead of lapin's default 16-attempt (~11 min) cap that would otherwise brick the backend permanently.
  • No silent job loss: publishes are mandatory; an unroutable message (e.g. a queue deleted at runtime) is surfaced as an error rather than acked-and-discarded.
  • No hot loops: when a retry can't be recorded in Redis, redelivery is paced by the computed (capped) backoff instead of an instant nack-with-requeue loop.
  • No shutdown duplication: in-flight handlers drain while the consumer channel is still open so their acks land; the channel is closed only afterward.

Testing Process

  • cargo fmt --check, the CI clippy gate, and the full queues unit suite (277 tests) — all green.
  • 7 gated integration tests against a real RabbitMQ + Redis: publish/consume round-trip, broker-only-carries-due-jobs, passive-declare backlog depth, settle-retry ordering, bounded-exhaustion and unbounded status-check retry, and an unroutable-publish regression.
  • End-to-end against a real broker + a funded Stellar testnet relayer: a transaction went pending → confirmed; a broker restart proved self-healing (relayer reconnected, a post-restart tx confirmed); credential redaction was verified in real logs (no guest:guest); /api/v1/ready reported healthy.

Checklist

  • Add a reference to related issues in the PR description.
  • Add unit tests if applicable.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added RabbitMQ as a supported queue backend, offering an alternative to SQS and Pub/Sub for job processing
    • Includes complete example project with Docker Compose setup for local RabbitMQ development and testing
  • Documentation

    • Added comprehensive RabbitMQ configuration guide covering environment variables, queue setup, and provisioning requirements
    • Added example documentation with setup procedures, operational behavior, and production deployment guidance

Add a fourth job-queue backend selectable via QUEUE_BACKEND=rabbitmq,
plugging into the existing QueueBackend trait. Each of the 8 queue
types maps to one durable classic queue published via the default
exchange, with publisher-confirmed durable publishes and lapin
connection auto-recovery.

Deferred jobs and retry backoff use the Redis store-and-run-when-due
scheduler, extracted from the Pub/Sub backend into shared
queues/schedule.rs and queues/worker_shared.rs (Pub/Sub keys kept
byte-identical); broker queues only ever carry already-due jobs.

- Idempotent declare with a passive verify-only mode
  (RABBITMQ_PASSIVE_QUEUES)
- Per-queue prefetch-bounded consumers, no lease management, 600s
  handler timeout
- Settle-retry ordering (ZADD before ack); health/depth via passive
  declare
- RABBITMQ_URL redacted in all logs/errors
- lapin =4.10.0 (exact-pinned + supply-chain triaged)
- Local example under examples/rabbitmq-queue-storage/ (real broker,
  one command)

Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Address review findings on the unmerged RabbitMQ queue backend.

Parse RABBITMQ_URL via AMQPUri up front and connect with connect_uri.
On parse failure return a static error, and build the redacted
endpoint from the parsed fields so the embedded password can never
reach a log or error (a missing-"//" typo previously leaked it).

Configure unbounded reconnect backoff with a capped delay instead of
lapin's default 16-attempt cap, so the backend keeps healing after a
long broker outage rather than bricking permanently.

Publish with mandatory=true and treat a returned (unroutable) message
as an error. An unroutable publish was silently acked and the job
lost, e.g. when a queue is deleted at runtime.

Pause for the capped retry delay before nack-with-requeue when a retry
cannot be recorded in Redis, avoiding a zero-backoff redelivery hot
loop while Redis is down.

Drain in-flight handlers while the consumer channel is still open on
graceful shutdown so their acks land; closing the channel first
requeued every in-flight delivery and duplicated each job. Also race
permit acquisition against the shutdown signal.

Log poisoned-acker no-ops (ack/nack returning Ok(false)) instead of
treating them as success, and clamp prefetch 0 to 1 (0 means unlimited
in AMQP).

Validated against a real broker and Redis (gated tests including a new
unroutable-publish regression) and a live relayer (redacted endpoint
in logs, no credentials, /ready healthy).

Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
@dylankilkenny dylankilkenny requested a review from a team as a code owner June 11, 2026 14:59
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fa2d8031-345a-4621-9bbe-826e4df3a0c3

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

This PR introduces a complete RabbitMQ queue backend to the OpenZeppelin relayer alongside refactored shared scheduler and handler logic. The implementation decouples queue scheduling (Redis-backed deferred jobs) and handler execution (timeout/panic handling) from transport specifics, enabling consistent behavior across Pub/Sub and RabbitMQ backends. Full Docker Compose example and configuration documentation are provided.

Changes

RabbitMQ Backend & Shared Queue Infrastructure

Layer / File(s) Summary
Dependency & Configuration setup
Cargo.toml, src/config/server_config.rs
Adds the pinned lapin = 4.10.0 dependency with rustls and aws_lc_rs features, plus new RabbitMQ env getters for URL (required), queue prefix (default: relayer), and passive verification mode (boolean).
Shared Redis scheduler for deferred & retry jobs
src/queues/schedule.rs
New module implements Redis sorted-set storage for scheduled/retrying jobs with per-backend segment keying. Provides atomic claim-due via Lua script, configurable sweep intervals (fast for status checks, slow for others), and a background sweep loop that backends invoke with publish callbacks.
Shared queue handler & timeout logic
src/queues/worker_shared.rs
New module provides reusable handler execution under timeout/panic guard, queue-type-based handler dispatch, retry exhaustion logic, network-aware backoff delays, correlation-id extraction, and concurrency resolution; removes duplicated logic from individual backend workers.
Queue backend abstraction & RabbitMQ routing
src/queues/mod.rs
Adds QueueBackendType::RabbitMq and QueueBackendStorage::RabbitMq variants, centralizes rustls crypto provider setup in ensure_crypto_provider(), implements full QueueBackend trait dispatch for RabbitMQ (job production, worker init, health checks, shutdown), and extends tests to verify RabbitMQ selection and error messaging.
Pub/Sub refactoring to use shared infrastructure
src/queues/pubsub/backend.rs, src/queues/pubsub/schedule.rs, src/queues/pubsub/worker.rs
Pub/Sub backend delegates crypto provider to shared helper; schedule.rs becomes thin adapter over shared scheduler with "pubsub" segment; worker.rs simplifies to use shared run_handler_with_timeout and retry_delay_for_queue, removing 300+ lines of duplicated timeout/retry/dispatch logic while retaining Pub/Sub-specific ack/lease mechanics.
RabbitMQ backend implementation
src/queues/rabbitmq/backend.rs
Full QueueBackend trait implementation: parses/redacts AMQP URIs, encodes Job bodies as JSON with persistent delivery + retry headers, enforces 16MiB size limit, constructs durable classic queue names, declares/verifies queues at startup with actionable error messages, routes status checks per NetworkType, enqueues jobs to Redis (if future) or broker (if immediate) with publisher confirms, spawns workers and crons, and probes health via passive declares. Includes comprehensive test suite.
RabbitMQ consumer & due-sweep worker
src/queues/rabbitmq/worker.rs
Spawns one consumer loop per queue type with prefetch clamping and manual ack, self-heals on disconnect via exponential backoff, processes deliveries under shared run_handler_with_timeout, settles outcomes (ack success/permanent, retry-to-Redis+ack on exhaustion, nack-with-requeue on retry failure), and includes unit tests for prefetch/backoff and ignored integration tests for header preservation, backlog observability, and exhaustion semantics.
RabbitMQ module entrypoint & re-exports
src/queues/rabbitmq/mod.rs
Module façade documenting RabbitMQ semantics relative to Pub/Sub; re-exports shared queue types/helpers (QueueBackend, QueueType, WorkerContext, retry helpers, errors) under RabbitMQ namespace.
Cross-backend test updates
src/queues/cron.rs
Updates test_cron_lock_keys_are_backend_neutral to verify cron lock keys do not embed backend names (including RabbitMQ), ensuring mixed-backend fleets share consistent lock domains.

Documentation & Examples

Layer / File(s) Summary
Configuration & provisioning documentation
README.md, docs/configuration/index.mdx
Adds RabbitMQ to supported queue backends table, documents required/optional env vars (RABBITMQ_URL, queue prefix, passive verification), explains queue topology (8 durable queues), notes Redis dependency for scheduling, and provides provisioning guide covering queue properties, TLS/credential redaction, permissions, and managed broker compatibility.
RabbitMQ example with Docker Compose
examples/rabbitmq-queue-storage/.env.example, examples/rabbitmq-queue-storage/README.md, examples/rabbitmq-queue-storage/config/config.json, examples/rabbitmq-queue-storage/docker-compose.yaml, examples/rabbitmq-queue-storage/rabbitmq.conf
Complete local example with environment template, relayer config (2 chains, local signer), Docker Compose (relayer, Redis, RabbitMQ services with healthchecks and restart policies), RabbitMQ broker config (loopback_users = none for local cross-container guest auth), and detailed README covering prerequisites, setup, queue topology, scheduling/retry/exhaustion semantics, operation verification, reliability exercises, shutdown, and production notes.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • OpenZeppelin/openzeppelin-relayer#791: This PR refactors the existing Pub/Sub backend code paths to share infrastructure with RabbitMQ, directly building on foundational Pub/Sub work.

Suggested labels

cla: allowlist

Suggested reviewers

  • zeljkoX
  • tirumerla

Poem

🐰 A rabbit hops down the message lane,
RabbitMQ takes the reins of the chain,
Shared helpers untangle the timeout dance,
While Pub/Sub adapts with graceful stance—
Eight queues sing in durable harmony! 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description covers Summary and Testing Process thoroughly; however, the Checklist is incomplete—the first item (reference related issues) is unchecked and no issues are referenced. Add references to related issues in the PR description and mark the corresponding checklist item as completed.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Add RabbitMQ queue backend' clearly and concisely describes the main feature being added.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch 003-rabbitmq-queue-backend

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov

codecov Bot commented Jun 11, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 46.50370% with 941 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.11%. Comparing base (f1872a1) to head (4e7c0df).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/queues/rabbitmq/worker.rs 16.36% 501 Missing ⚠️
src/queues/rabbitmq/backend.rs 65.04% 194 Missing ⚠️
src/queues/worker_shared.rs 49.53% 109 Missing ⚠️
src/queues/schedule.rs 63.49% 92 Missing ⚠️
src/queues/pubsub/schedule.rs 20.68% 23 Missing ⚠️
src/queues/pubsub/worker.rs 15.38% 11 Missing ⚠️
src/queues/mod.rs 83.33% 8 Missing ⚠️
src/config/server_config.rs 95.34% 2 Missing ⚠️
src/queues/cron.rs 66.66% 1 Missing ⚠️
Additional details and impacted files
Flag Coverage Δ
ai 0.00% <0.00%> (ø)
dev 89.11% <46.50%> (-0.54%) ⬇️
properties 0.01% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

@@            Coverage Diff             @@
##             main     #798      +/-   ##
==========================================
- Coverage   89.65%   89.11%   -0.54%     
==========================================
  Files         299      305       +6     
  Lines      126736   129079    +2343     
==========================================
+ Hits       113619   115028    +1409     
- Misses      13117    14051     +934     
Files with missing lines Coverage Δ
src/queues/pubsub/backend.rs 44.60% <100.00%> (-0.27%) ⬇️
src/queues/cron.rs 37.94% <66.66%> (+0.27%) ⬆️
src/config/server_config.rs 94.83% <95.34%> (+0.01%) ⬆️
src/queues/mod.rs 75.40% <83.33%> (+2.13%) ⬆️
src/queues/pubsub/worker.rs 1.87% <15.38%> (-13.46%) ⬇️
src/queues/pubsub/schedule.rs 25.65% <20.68%> (+5.83%) ⬆️
src/queues/schedule.rs 63.49% <63.49%> (ø)
src/queues/worker_shared.rs 49.53% <49.53%> (ø)
src/queues/rabbitmq/backend.rs 65.04% <65.04%> (ø)
src/queues/rabbitmq/worker.rs 16.36% <16.36%> (ø)

... and 13 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@examples/rabbitmq-queue-storage/README.md`:
- Around line 166-167: The markdown link target
'../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md' is
invalid; locate the actual repository location of the file named
contracts/config-and-queues.md (or the equivalent spec) and update the README
link to the correct relative path to that file (or replace it with the proper
docs URL), ensuring the link resolves; verify by opening the updated link or
running the repo link checker.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: acbe2bf2-6172-4629-954c-7f1eaa3b53f9

📥 Commits

Reviewing files that changed from the base of the PR and between f1872a1 and 45ac14c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • Cargo.toml
  • README.md
  • docs/configuration/index.mdx
  • examples/rabbitmq-queue-storage/.env.example
  • examples/rabbitmq-queue-storage/README.md
  • examples/rabbitmq-queue-storage/config/config.json
  • examples/rabbitmq-queue-storage/docker-compose.yaml
  • examples/rabbitmq-queue-storage/rabbitmq.conf
  • src/config/server_config.rs
  • src/queues/cron.rs
  • src/queues/mod.rs
  • src/queues/pubsub/backend.rs
  • src/queues/pubsub/schedule.rs
  • src/queues/pubsub/worker.rs
  • src/queues/rabbitmq/backend.rs
  • src/queues/rabbitmq/mod.rs
  • src/queues/rabbitmq/worker.rs
  • src/queues/schedule.rs
  • src/queues/worker_shared.rs

Comment on lines +166 to +167
Full operator reference: [`contracts/config-and-queues.md`](../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md)
and the [configuration docs](../../docs/configuration/index.mdx).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Check if the referenced spec file exists

# From examples/rabbitmq-queue-storage/README.md, the relative path should resolve to:
fd -t f 'config-and-queues.md' specs/

Repository: OpenZeppelin/openzeppelin-relayer

Length of output: 171


Fix operator reference link in RabbitMQ queue-storage README
The link ../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md (examples/rabbitmq-queue-storage/README.md lines 166-167) points to a specs/ directory at the repo root, but specs/ does not exist there—so the relative path can’t resolve. Update the link to the correct existing file location (or add/move the missing spec file).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/rabbitmq-queue-storage/README.md` around lines 166 - 167, The
markdown link target
'../../specs/003-rabbitmq-queue-backend/contracts/config-and-queues.md' is
invalid; locate the actual repository location of the file named
contracts/config-and-queues.md (or the equivalent spec) and update the README
link to the correct relative path to that file (or replace it with the proper
docs URL), ensuring the link resolves; verify by opening the updated link or
running the repo link checker.

Comments only; no behavior change.

- Drop references to internal spec/planning artifacts (constitution,
  functional-requirement and task IDs, review-finding labels) that are
  not committed to the repo and so dangle for an outside reader. Keep
  the reasoning, drop the unresolvable pointer. Also covers the
  pre-existing references on the Pub/Sub dependency pins.
- Remove decorative section-divider comments.
- Fix two inaccurate comments: the `redelivered` flag is set on any
  redelivery (not only channel loss), and lapin's rustls feature
  already selects aws-lc-rs (so default-features=false does not avoid
  a second crypto provider).

Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Add broker-free unit tests for the RabbitMQ backend's pure logic:

- Extract the publisher-confirm result mapping into
  classify_publish_confirmation and test all four Confirmation
  branches (routed ack, unroutable return, nack, not-requested).
- Test amqp_value_as_usize across every AMQP integer width plus
  negative and non-integer inputs.
- Test the ack/nack helpers over both acker states (success and the
  poisoned/used no-op) via mock deliveries.
- Test the settle-retry drop paths (budget exhausted, non-UTF8 body)
  that ack and return before any Redis write.

Lifts unit coverage of backend.rs 53->61% and worker.rs 2->18%. The
remaining uncovered code is broker/Redis I/O exercised by the gated
integration tests.

Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Add unit tests for two previously-uncovered infra-free paths:

- declare_failure_reason: verify a real lapin protocol error's AMQP
  reply code is extracted via get_id() and routed to the actionable
  message, and that a non-protocol error keeps the lapin detail.
- publish_scheduled: verify the success path hands the job to the
  publish callback verbatim, and that the publish+requeue double-failure
  path resolves without panicking.

Signed-off-by: Dylan Kilkenny <dylankilkenny95@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant