From 3a793b870b85d7c90fc95913f62b6af1b69c860e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 20 Apr 2026 15:11:06 -0300 Subject: [PATCH 1/6] feat(api): add runtime aggregator role toggle Ports leanSpec PR #636: adds `GET /lean/v0/admin/aggregator` and `POST /lean/v0/admin/aggregator` endpoints that let operators rotate aggregation duties across nodes without restarting. The `--is-aggregator` CLI flag now seeds a shared `AggregatorController` (an `Arc` in `ethlambda-types`) rather than a plain bool. The blockchain actor reads the flag fresh on every tick and gossip attestation, so runtime toggles take effect from the next tick. Subnet subscriptions remain frozen at startup, matching the spec's explicit scope limitation: standby aggregators must still boot with `--is-aggregator` so gossip subscriptions are in place, then use the admin endpoint to rotate duties (hot-standby model). The `lean_is_aggregator` metric is synced from the actor's read rather than from the HTTP handler so the gauge reflects what the actor actually acted on, not what was requested. POST strictly rejects non-boolean `enabled` (incl. JSON 0/1) to match the spec's semantics. --- CLAUDE.md | 7 + bin/ethlambda/src/main.rs | 26 ++- crates/blockchain/src/lib.rs | 38 ++-- crates/common/types/src/aggregator.rs | 99 +++++++++++ crates/common/types/src/lib.rs | 1 + crates/net/p2p/src/lib.rs | 9 + crates/net/rpc/src/admin.rs | 243 ++++++++++++++++++++++++++ crates/net/rpc/src/lib.rs | 22 ++- 8 files changed, 429 insertions(+), 16 deletions(-) create mode 100644 crates/common/types/src/aggregator.rs create mode 100644 crates/net/rpc/src/admin.rs diff --git a/CLAUDE.md b/CLAUDE.md index c1c58d95..49d8409c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -344,6 +344,13 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads - The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks - **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip +### Runtime Aggregator Toggle (Hot-Standby Model) +- `POST /lean/v0/admin/aggregator` with `{"enabled": bool}` toggles the aggregator role at runtime without restart (ported from leanSpec PR #636) +- `GET /lean/v0/admin/aggregator` returns `{"is_aggregator": bool}` +- The CLI `--is-aggregator` flag **seeds** the initial value; runtime toggles are in-process only (not persisted across restarts) +- Runtime toggles do NOT resubscribe gossip subnets — those are frozen at startup by `build_swarm`. Toggling ON at runtime only activates aggregation logic for subnets the node was already subscribed to +- **Operational model**: standby aggregators should boot with `--is-aggregator=true` (so subscriptions are in place), then use the admin endpoint to rotate duties. A node booted with `--is-aggregator=false` and toggled ON later will have no extra subnets to aggregate + ### Signature Verification - Fork choice tests use `on_block_without_verification()` to skip signature checks - Signature spec tests use `on_block()` which always verifies diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index e89b6990..3c3f816c 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -23,6 +23,7 @@ use ethlambda_network_api::{InitBlockChain, InitP2P, ToBlockChainToP2PRef, ToP2P use ethlambda_p2p::{Bootnode, P2P, SwarmConfig, build_swarm, parse_enrs}; use ethlambda_types::primitives::H256; use ethlambda_types::{ + aggregator::AggregatorController, genesis::GenesisConfig, signature::ValidatorSecretKey, state::{State, ValidatorPubkeyBytes}, @@ -64,7 +65,15 @@ struct CliOptions { /// When set, skips genesis initialization and syncs from checkpoint. #[arg(long)] checkpoint_sync_url: Option, - /// Whether this node acts as a committee aggregator + /// Whether this node acts as a committee aggregator. + /// + /// Seeds the initial value of the live aggregator flag shared by the + /// blockchain actor and the admin API. The flag can be toggled at + /// runtime via `POST /lean/v0/admin/aggregator`. Runtime toggles do + /// NOT persist across restarts and do NOT update gossip subnet + /// subscriptions, which are frozen at startup — standby aggregators + /// should boot with this flag enabled to establish subscriptions, then + /// use the admin endpoint to rotate duties (hot-standby model). #[arg(long, default_value = "false")] is_aggregator: bool, /// Number of attestation committees (subnets) per slot @@ -154,8 +163,19 @@ async fn main() -> eyre::Result<()> { .inspect_err(|err| error!(%err, "Failed to initialize state"))?; let validator_ids: Vec = validator_keys.keys().copied().collect(); - let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator); + // Shared, runtime-mutable aggregator flag. Seeded from the CLI and + // threaded into both the blockchain actor (which reads on every tick) + // and the API server (which exposes GET/POST admin endpoints). + let aggregator = AggregatorController::new(options.is_aggregator); + + let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()); + + // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the + // AggregatorController — subnet subscriptions are decided once here and + // are not re-evaluated at runtime. Toggling via the admin API affects + // aggregation logic but not the gossip mesh. See crates/net/p2p/src/lib.rs + // for the invariant. let built = build_swarm(SwarmConfig { node_key: node_p2p_key, bootnodes, @@ -190,7 +210,7 @@ async fn main() -> eyre::Result<()> { .inspect_err(|err| error!(%err, "Metrics server failed")); }); tokio::spawn(async move { - let _ = ethlambda_rpc::start_api_server(api_socket, store) + let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator) .await .inspect_err(|err| error!(%err, "API server failed")); }); diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 9ee4ee8e..9c0d52df 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -6,6 +6,7 @@ use ethlambda_state_transition::is_proposer; use ethlambda_storage::{ALL_TABLES, Store}; use ethlambda_types::{ ShortRoot, + aggregator::AggregatorController, attestation::{SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, SignedBlock}, primitives::{H256, HashTreeRoot as _}, @@ -44,9 +45,9 @@ impl BlockChain { pub fn spawn( store: Store, validator_keys: HashMap, - is_aggregator: bool, + aggregator: AggregatorController, ) -> BlockChain { - metrics::set_is_aggregator(is_aggregator); + metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); @@ -55,7 +56,7 @@ impl BlockChain { p2p: None, key_manager, pending_blocks: HashMap::new(), - is_aggregator, + aggregator, pending_block_parents: HashMap::new(), } .start(); @@ -97,7 +98,11 @@ pub struct BlockChainServer { pending_block_parents: HashMap, /// Whether this node acts as a committee aggregator. - is_aggregator: bool, + /// + /// Read fresh on every tick and gossip event so runtime toggles via the + /// admin API take effect without a restart. Seeded from the CLI + /// `--is-aggregator` flag at spawn. + aggregator: AggregatorController, } impl BlockChainServer { @@ -119,6 +124,13 @@ impl BlockChainServer { // Update current slot metric metrics::update_current_slot(slot); + // Snapshot the aggregator flag once per tick so all read sites within + // the tick see a consistent value even if the admin API toggles it + // mid-tick. Mirror it to the gauge from the actor side so + // `lean_is_aggregator` reflects the value the actor is acting on. + let is_aggregator = self.aggregator.is_enabled(); + metrics::set_is_aggregator(is_aggregator); + // At interval 0, check if we will propose (but don't build the block yet). // Tick forkchoice first to accept attestations, then build the block // using the freshly-accepted attestations. @@ -131,7 +143,7 @@ impl BlockChainServer { &mut self.store, timestamp_ms, proposer_validator_id.is_some(), - self.is_aggregator, + is_aggregator, ); if let Some(ref p2p) = self.p2p { @@ -147,9 +159,11 @@ impl BlockChainServer { self.propose_block(slot, validator_id); } - // Produce attestations at interval 1 (all validators including proposer) + // Produce attestations at interval 1 (all validators including proposer). + // Reuse the same snapshot so self-delivery decisions match the rest + // of the tick. if interval == 1 { - self.produce_attestations(slot); + self.produce_attestations(slot, is_aggregator); } // Update safe target slot metric (updated by store.on_tick at interval 3) @@ -169,7 +183,7 @@ impl BlockChainServer { .find(|&vid| is_proposer(vid, slot, num_validators)) } - fn produce_attestations(&mut self, slot: u64) { + fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) { // Produce attestation data once for all validators let attestation_data = store::produce_attestation_data(&self.store, slot); @@ -197,7 +211,7 @@ impl BlockChainServer { // Gossipsub does not deliver messages back to the sender, so without // this the aggregator never sees its own validator's signature in // gossip_signatures and it is excluded from aggregated proofs. - if self.is_aggregator { + if is_aggregator { let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation, true) .inspect_err(|err| { warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed") @@ -471,7 +485,11 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: &SignedAttestation) { - let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator) + // Read fresh here too: a gossip event can arrive between ticks, and + // if the admin API just toggled, the first gossip after the toggle + // should already use the new value. + let is_aggregator = self.aggregator.is_enabled(); + let _ = store::on_gossip_attestation(&mut self.store, attestation, is_aggregator) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } diff --git a/crates/common/types/src/aggregator.rs b/crates/common/types/src/aggregator.rs new file mode 100644 index 00000000..80bacefb --- /dev/null +++ b/crates/common/types/src/aggregator.rs @@ -0,0 +1,99 @@ +//! Runtime-toggleable aggregator role. +//! +//! Tracks whether this node should act as a committee aggregator. Shared +//! between the blockchain actor (reads on every tick and gossip attestation) +//! and the admin API (writes when operators rotate duties). A thin wrapper +//! around [`Arc`] so reads stay cheap and writes stay atomic. +//! +//! Mirrors leanSpec's `AggregatorController` (PR #636) with the Rust analogue +//! of its asyncio lock: a single atomic cell. One flag is enough because +//! ethlambda's P2P swarm only reads `is_aggregator` at construction time; +//! runtime toggles do not (and cannot) resubscribe gossip subnets. +//! +//! # Invariants +//! +//! - The flag carries no dependent data; loads and stores use `Relaxed`. +//! - Metric updates live in the blockchain actor so the gauge reflects what +//! the actor acted on rather than what was requested. +//! - If a P2P runtime reader is ever added, it must consult this controller +//! instead of a stored bool. See `crates/net/p2p/src/lib.rs` `SwarmConfig`. + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Shared, runtime-mutable aggregator role flag. +#[derive(Clone, Debug)] +pub struct AggregatorController { + flag: Arc, +} + +impl AggregatorController { + /// Construct a controller seeded with the CLI `--is-aggregator` value. + pub fn new(initial: bool) -> Self { + Self { + flag: Arc::new(AtomicBool::new(initial)), + } + } + + pub fn is_enabled(&self) -> bool { + self.flag.load(Ordering::Relaxed) + } + + /// Update the role and return the previous value. + pub fn set_enabled(&self, enabled: bool) -> bool { + self.flag.swap(enabled, Ordering::Relaxed) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Barrier; + use std::thread; + + #[test] + fn seeded_value_is_observable() { + let c = AggregatorController::new(true); + assert!(c.is_enabled()); + let c = AggregatorController::new(false); + assert!(!c.is_enabled()); + } + + #[test] + fn set_enabled_returns_previous_value() { + let c = AggregatorController::new(false); + assert!(!c.set_enabled(true)); + assert!(c.is_enabled()); + assert!(c.set_enabled(false)); + assert!(!c.is_enabled()); + } + + #[test] + fn set_enabled_is_idempotent() { + let c = AggregatorController::new(true); + assert!(c.set_enabled(true)); + assert!(c.set_enabled(true)); + assert!(c.is_enabled()); + } + + #[test] + fn updates_are_visible_across_threads() { + // Barrier provides the happens-before; the atomic itself carries no + // dependent data, hence Relaxed is sufficient. + let c = AggregatorController::new(false); + let barrier = Arc::new(Barrier::new(2)); + + let writer_barrier = barrier.clone(); + let writer = { + let c = c.clone(); + thread::spawn(move || { + c.set_enabled(true); + writer_barrier.wait(); + }) + }; + + barrier.wait(); + assert!(c.is_enabled()); + writer.join().unwrap(); + } +} diff --git a/crates/common/types/src/lib.rs b/crates/common/types/src/lib.rs index 6f9b28b9..aa180c98 100644 --- a/crates/common/types/src/lib.rs +++ b/crates/common/types/src/lib.rs @@ -1,3 +1,4 @@ +pub mod aggregator; pub mod attestation; pub mod block; pub mod checkpoint; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index e5dcff84..8f2e4a6f 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -74,6 +74,15 @@ pub(crate) struct Behaviour { } /// Configuration for building the libp2p swarm. +/// +/// INVARIANT: `is_aggregator` is consumed once during [`build_swarm`] to decide +/// subnet subscriptions and is NOT stored on [`P2PServer`]. Runtime toggles +/// of the aggregator role via the admin API (see +/// [`ethlambda_types::aggregator::AggregatorController`]) intentionally do +/// not resubscribe gossip subnets — this is the leanSpec PR #636 scope +/// limitation ("hot-standby model"). If a runtime reader is ever added on +/// the P2P side, it must consult the shared `AggregatorController` instead +/// of a bool captured here, or the runtime toggle will silently diverge. pub struct SwarmConfig { pub node_key: Vec, pub bootnodes: Vec, diff --git a/crates/net/rpc/src/admin.rs b/crates/net/rpc/src/admin.rs new file mode 100644 index 00000000..1f5d123c --- /dev/null +++ b/crates/net/rpc/src/admin.rs @@ -0,0 +1,243 @@ +//! Admin endpoints for runtime-toggleable node roles. +//! +//! Ported from leanSpec PR #636. The POST handler strictly rejects non-boolean +//! values (including JSON integers 0/1) to match the spec's semantics. +//! +//! # Scope +//! +//! Toggling the aggregator flag at runtime does **not** change gossip subnet +//! subscriptions, which are frozen at startup. For full parity with the CLI +//! `--is-aggregator` flag, a standby node must boot with the flag enabled so +//! that subscriptions are in place, then use this endpoint to disable/enable +//! the role (hot-standby model). See leanSpec PR #636 for the full rationale. + +use axum::{ + Extension, Json, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use ethlambda_types::aggregator::AggregatorController; +use serde::Serialize; +use serde_json::Value; +use tracing::info; + +use crate::json_response; + +#[derive(Serialize)] +struct StatusResponse { + is_aggregator: bool, +} + +#[derive(Serialize)] +struct ToggleResponse { + is_aggregator: bool, + previous: bool, +} + +/// GET /lean/v0/admin/aggregator — returns current aggregator role. +/// +/// Returns 503 when the controller is not wired. Kept for spec parity with +/// leanSpec, even though in ethlambda the controller is always wired when +/// the API server is started via `main.rs`. +pub async fn get_aggregator(controller: Option>) -> Response { + match controller { + Some(Extension(controller)) => json_response(StatusResponse { + is_aggregator: controller.is_enabled(), + }), + None => service_unavailable("Aggregator controller not available"), + } +} + +/// POST /lean/v0/admin/aggregator — toggles aggregator role at runtime. +/// +/// Body: `{"enabled": bool}`. Returns `{"is_aggregator": new, "previous": old}`. +/// 400 on missing/invalid body, 503 when the controller is not wired. +pub async fn post_aggregator( + controller: Option>, + body: Option>, +) -> Response { + let Some(Extension(controller)) = controller else { + return service_unavailable("Aggregator controller not available"); + }; + + // Parsing happens through `Option>` so we can distinguish + // "no body / malformed JSON" (None) from "valid JSON with wrong shape". + let Some(Json(payload)) = body else { + return bad_request("Invalid or missing JSON body"); + }; + + let Some(enabled_value) = payload.get("enabled") else { + return bad_request("Missing 'enabled' field in body"); + }; + + // Reject ints like 0/1: JSON does not distinguish them from booleans in + // loose parsers, but the spec's POST handler strictly requires a boolean. + let Some(enabled) = enabled_value.as_bool() else { + return bad_request("'enabled' must be a boolean"); + }; + + let previous = controller.set_enabled(enabled); + if previous != enabled { + info!(enabled, previous, "Aggregator role toggled via admin API"); + } + + json_response(ToggleResponse { + is_aggregator: enabled, + previous, + }) +} + +fn bad_request(reason: &'static str) -> Response { + (StatusCode::BAD_REQUEST, reason).into_response() +} + +fn service_unavailable(reason: &'static str) -> Response { + (StatusCode::SERVICE_UNAVAILABLE, reason).into_response() +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::body::Body; + use axum::http::{Method, Request, StatusCode}; + use axum::routing::get; + use axum::{Extension, Router}; + use http_body_util::BodyExt; + use tower::ServiceExt; + + fn router(controller: Option) -> Router { + let mut router = Router::new().route( + "/lean/v0/admin/aggregator", + get(get_aggregator).post(post_aggregator), + ); + if let Some(controller) = controller { + router = router.layer(Extension(controller)); + } + router + } + + async fn body_json(resp: Response) -> Value { + let body = resp.into_body().collect().await.unwrap().to_bytes(); + serde_json::from_slice(&body).unwrap() + } + + #[tokio::test] + async fn get_returns_current_state() { + let controller = AggregatorController::new(true); + let resp = router(Some(controller)) + .oneshot( + Request::builder() + .uri("/lean/v0/admin/aggregator") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": true}) + ); + } + + #[tokio::test] + async fn get_returns_503_without_controller() { + let resp = router(None) + .oneshot( + Request::builder() + .uri("/lean/v0/admin/aggregator") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE); + } + + async fn post(controller: Option, body: &str) -> Response { + router(controller) + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/lean/v0/admin/aggregator") + .header("content-type", "application/json") + .body(Body::from(body.to_string())) + .unwrap(), + ) + .await + .unwrap() + } + + #[tokio::test] + async fn post_activates_and_returns_previous() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller.clone()), r#"{"enabled": true}"#).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": true, "previous": false}), + ); + assert!(controller.is_enabled()); + } + + #[tokio::test] + async fn post_deactivates_and_returns_previous() { + let controller = AggregatorController::new(true); + let resp = post(Some(controller.clone()), r#"{"enabled": false}"#).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": false, "previous": true}), + ); + assert!(!controller.is_enabled()); + } + + #[tokio::test] + async fn post_is_idempotent() { + let controller = AggregatorController::new(true); + let _ = post(Some(controller.clone()), r#"{"enabled": true}"#).await; + let resp = post(Some(controller.clone()), r#"{"enabled": true}"#).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + body_json(resp).await, + serde_json::json!({"is_aggregator": true, "previous": true}), + ); + } + + #[tokio::test] + async fn post_rejects_missing_enabled_field() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller), r#"{"other": true}"#).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_rejects_integer_in_place_of_bool() { + // JSON parsers in other languages sometimes coerce 0/1 → bool; the + // spec explicitly rejects this, so we do too. + let controller = AggregatorController::new(false); + let resp = post(Some(controller), r#"{"enabled": 1}"#).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_rejects_string_in_place_of_bool() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller), r#"{"enabled": "true"}"#).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_rejects_malformed_json() { + let controller = AggregatorController::new(false); + let resp = post(Some(controller), "not json").await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn post_returns_503_without_controller() { + let resp = post(None, r#"{"enabled": true}"#).await; + assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE); + } +} diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 5973dc62..acec7fa1 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -1,19 +1,27 @@ use std::net::SocketAddr; -use axum::{Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get}; +use axum::{ + Extension, Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get, +}; use ethlambda_storage::Store; +use ethlambda_types::aggregator::AggregatorController; use ethlambda_types::primitives::H256; use libssz::SszEncode; pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; +mod admin; mod fork_choice; mod heap_profiling; pub mod metrics; -pub async fn start_api_server(address: SocketAddr, store: Store) -> Result<(), std::io::Error> { - let api_router = build_api_router(store); +pub async fn start_api_server( + address: SocketAddr, + store: Store, + aggregator: AggregatorController, +) -> Result<(), std::io::Error> { + let api_router = build_api_router(store).layer(Extension(aggregator)); let listener = tokio::net::TcpListener::bind(address).await?; axum::serve(listener, api_router).await?; @@ -34,6 +42,10 @@ pub async fn start_metrics_server(address: SocketAddr) -> Result<(), std::io::Er } /// Build the API router with the given store. +/// +/// The aggregator controller is threaded in via `Extension` by the caller +/// (see `start_api_server`) so existing store-backed handlers don't need to +/// know about it and admin handlers extract it independently. fn build_api_router(store: Store) -> Router { Router::new() .route("/lean/v0/health", get(metrics::get_health)) @@ -47,6 +59,10 @@ fn build_api_router(store: Store) -> Router { "/lean/v0/fork_choice/ui", get(fork_choice::get_fork_choice_ui), ) + .route( + "/lean/v0/admin/aggregator", + get(admin::get_aggregator).post(admin::post_aggregator), + ) .with_state(store) } From 1f69f01ae1052aa1e2bbab19e6efb6d935f2ad54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 20 Apr 2026 18:42:34 -0300 Subject: [PATCH 2/6] docs: add TODO --- bin/ethlambda/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 3c3f816c..a5745900 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -176,6 +176,7 @@ async fn main() -> eyre::Result<()> { // are not re-evaluated at runtime. Toggling via the admin API affects // aggregation logic but not the gossip mesh. See crates/net/p2p/src/lib.rs // for the invariant. + // TODO: update ENR with the dynamic is_aggregator value let built = build_swarm(SwarmConfig { node_key: node_p2p_key, bootnodes, From 51735c7aef220753a93a475b70c2ede4730283e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 20 Apr 2026 18:44:26 -0300 Subject: [PATCH 3/6] chore: remove useless tests --- crates/common/types/src/aggregator.rs | 53 --------------------------- 1 file changed, 53 deletions(-) diff --git a/crates/common/types/src/aggregator.rs b/crates/common/types/src/aggregator.rs index 80bacefb..3cd5d1de 100644 --- a/crates/common/types/src/aggregator.rs +++ b/crates/common/types/src/aggregator.rs @@ -44,56 +44,3 @@ impl AggregatorController { self.flag.swap(enabled, Ordering::Relaxed) } } - -#[cfg(test)] -mod tests { - use super::*; - use std::sync::Barrier; - use std::thread; - - #[test] - fn seeded_value_is_observable() { - let c = AggregatorController::new(true); - assert!(c.is_enabled()); - let c = AggregatorController::new(false); - assert!(!c.is_enabled()); - } - - #[test] - fn set_enabled_returns_previous_value() { - let c = AggregatorController::new(false); - assert!(!c.set_enabled(true)); - assert!(c.is_enabled()); - assert!(c.set_enabled(false)); - assert!(!c.is_enabled()); - } - - #[test] - fn set_enabled_is_idempotent() { - let c = AggregatorController::new(true); - assert!(c.set_enabled(true)); - assert!(c.set_enabled(true)); - assert!(c.is_enabled()); - } - - #[test] - fn updates_are_visible_across_threads() { - // Barrier provides the happens-before; the atomic itself carries no - // dependent data, hence Relaxed is sufficient. - let c = AggregatorController::new(false); - let barrier = Arc::new(Barrier::new(2)); - - let writer_barrier = barrier.clone(); - let writer = { - let c = c.clone(); - thread::spawn(move || { - c.set_enabled(true); - writer_barrier.wait(); - }) - }; - - barrier.wait(); - assert!(c.is_enabled()); - writer.join().unwrap(); - } -} From 4f50c6c9940643545c181d17037116a1152bc06b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 20 Apr 2026 18:50:13 -0300 Subject: [PATCH 4/6] Revert "docs: add TODO" This reverts commit 1f69f01ae1052aa1e2bbab19e6efb6d935f2ad54. --- bin/ethlambda/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index a5745900..3c3f816c 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -176,7 +176,6 @@ async fn main() -> eyre::Result<()> { // are not re-evaluated at runtime. Toggling via the admin API affects // aggregation logic but not the gossip mesh. See crates/net/p2p/src/lib.rs // for the invariant. - // TODO: update ENR with the dynamic is_aggregator value let built = build_swarm(SwarmConfig { node_key: node_p2p_key, bootnodes, From b8e8d867cb7c6f589f4a6ad42ffc429ab6090624 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 20 Apr 2026 18:51:21 -0300 Subject: [PATCH 5/6] chore: remove superfluous doc --- crates/common/types/src/aggregator.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/crates/common/types/src/aggregator.rs b/crates/common/types/src/aggregator.rs index 3cd5d1de..b003fb7c 100644 --- a/crates/common/types/src/aggregator.rs +++ b/crates/common/types/src/aggregator.rs @@ -1,23 +1,3 @@ -//! Runtime-toggleable aggregator role. -//! -//! Tracks whether this node should act as a committee aggregator. Shared -//! between the blockchain actor (reads on every tick and gossip attestation) -//! and the admin API (writes when operators rotate duties). A thin wrapper -//! around [`Arc`] so reads stay cheap and writes stay atomic. -//! -//! Mirrors leanSpec's `AggregatorController` (PR #636) with the Rust analogue -//! of its asyncio lock: a single atomic cell. One flag is enough because -//! ethlambda's P2P swarm only reads `is_aggregator` at construction time; -//! runtime toggles do not (and cannot) resubscribe gossip subnets. -//! -//! # Invariants -//! -//! - The flag carries no dependent data; loads and stores use `Relaxed`. -//! - Metric updates live in the blockchain actor so the gauge reflects what -//! the actor acted on rather than what was requested. -//! - If a P2P runtime reader is ever added, it must consult this controller -//! instead of a stored bool. See `crates/net/p2p/src/lib.rs` `SwarmConfig`. - use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; From 53d10687b4a27eedeaabebb04342803f93561076 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 20 Apr 2026 19:22:12 -0300 Subject: [PATCH 6/6] refactor: clean up docs and rename test --- crates/net/rpc/src/admin.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/net/rpc/src/admin.rs b/crates/net/rpc/src/admin.rs index 1f5d123c..43804d02 100644 --- a/crates/net/rpc/src/admin.rs +++ b/crates/net/rpc/src/admin.rs @@ -39,6 +39,11 @@ struct ToggleResponse { /// Returns 503 when the controller is not wired. Kept for spec parity with /// leanSpec, even though in ethlambda the controller is always wired when /// the API server is started via `main.rs`. +/// +/// The `Option>` wrapping makes the extractor infallible: a bare +/// `Extension` would cause axum to short-circuit with a 500 when the +/// extension is missing, whereas `Option` yields `None` and lets us return +/// a clean 503 with a useful message. pub async fn get_aggregator(controller: Option>) -> Response { match controller { Some(Extension(controller)) => json_response(StatusResponse { @@ -50,8 +55,13 @@ pub async fn get_aggregator(controller: Option>) /// POST /lean/v0/admin/aggregator — toggles aggregator role at runtime. /// -/// Body: `{"enabled": bool}`. Returns `{"is_aggregator": new, "previous": old}`. +/// Body: `{"enabled": bool}`. Returns `{"is_aggregator": , "previous": }`. /// 400 on missing/invalid body, 503 when the controller is not wired. +/// +/// The `Option>` wrapping makes the extractor infallible: a bare +/// `Extension` would cause axum to short-circuit with a 500 when the +/// extension is missing, whereas `Option` yields `None` and lets us return +/// a clean 503 with a useful message. pub async fn post_aggregator( controller: Option>, body: Option>, @@ -70,8 +80,6 @@ pub async fn post_aggregator( return bad_request("Missing 'enabled' field in body"); }; - // Reject ints like 0/1: JSON does not distinguish them from booleans in - // loose parsers, but the spec's POST handler strictly requires a boolean. let Some(enabled) = enabled_value.as_bool() else { return bad_request("'enabled' must be a boolean"); }; @@ -194,7 +202,7 @@ mod tests { } #[tokio::test] - async fn post_is_idempotent() { + async fn post_noop_when_value_matches_state() { let controller = AggregatorController::new(true); let _ = post(Some(controller.clone()), r#"{"enabled": true}"#).await; let resp = post(Some(controller.clone()), r#"{"enabled": true}"#).await;