Skip to content
7 changes: 7 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,13 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads
- The attestation pipeline: gossip → verify signature → store gossip signature (only if `is_aggregator`) → aggregate at interval 2 → promote to known → pack into blocks
- **Symptom**: `justified_slot=0` and `finalized_slot=0` indefinitely despite healthy block production and attestation gossip

### Runtime Aggregator Toggle (Hot-Standby Model)
- `POST /lean/v0/admin/aggregator` with `{"enabled": bool}` toggles the aggregator role at runtime without restart (ported from leanSpec PR #636)
- `GET /lean/v0/admin/aggregator` returns `{"is_aggregator": bool}`
- The CLI `--is-aggregator` flag **seeds** the initial value; runtime toggles are in-process only (not persisted across restarts)
- Runtime toggles do NOT resubscribe gossip subnets — those are frozen at startup by `build_swarm`. Toggling ON at runtime only activates aggregation logic for subnets the node was already subscribed to
- **Operational model**: standby aggregators should boot with `--is-aggregator=true` (so subscriptions are in place), then use the admin endpoint to rotate duties. A node booted with `--is-aggregator=false` and toggled ON later will have no extra subnets to aggregate

### Signature Verification
- Fork choice tests use `on_block_without_verification()` to skip signature checks
- Signature spec tests use `on_block()` which always verifies
Expand Down
26 changes: 23 additions & 3 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ethlambda_network_api::{InitBlockChain, InitP2P, ToBlockChainToP2PRef, ToP2P
use ethlambda_p2p::{Bootnode, P2P, SwarmConfig, build_swarm, parse_enrs};
use ethlambda_types::primitives::H256;
use ethlambda_types::{
aggregator::AggregatorController,
genesis::GenesisConfig,
signature::ValidatorSecretKey,
state::{State, ValidatorPubkeyBytes},
Expand Down Expand Up @@ -64,7 +65,15 @@ struct CliOptions {
/// When set, skips genesis initialization and syncs from checkpoint.
#[arg(long)]
checkpoint_sync_url: Option<String>,
/// Whether this node acts as a committee aggregator
/// Whether this node acts as a committee aggregator.
///
/// Seeds the initial value of the live aggregator flag shared by the
/// blockchain actor and the admin API. The flag can be toggled at
/// runtime via `POST /lean/v0/admin/aggregator`. Runtime toggles do
/// NOT persist across restarts and do NOT update gossip subnet
/// subscriptions, which are frozen at startup — standby aggregators
/// should boot with this flag enabled to establish subscriptions, then
/// use the admin endpoint to rotate duties (hot-standby model).
#[arg(long, default_value = "false")]
is_aggregator: bool,
/// Number of attestation committees (subnets) per slot
Expand Down Expand Up @@ -154,8 +163,19 @@ async fn main() -> eyre::Result<()> {
.inspect_err(|err| error!(%err, "Failed to initialize state"))?;

let validator_ids: Vec<u64> = validator_keys.keys().copied().collect();
let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator);

// Shared, runtime-mutable aggregator flag. Seeded from the CLI and
// threaded into both the blockchain actor (which reads on every tick)
// and the API server (which exposes GET/POST admin endpoints).
let aggregator = AggregatorController::new(options.is_aggregator);

let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone());

// Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the
// AggregatorController — subnet subscriptions are decided once here and
// are not re-evaluated at runtime. Toggling via the admin API affects
// aggregation logic but not the gossip mesh. See crates/net/p2p/src/lib.rs
// for the invariant.
Comment thread
MegaRedHand marked this conversation as resolved.
let built = build_swarm(SwarmConfig {
node_key: node_p2p_key,
bootnodes,
Expand Down Expand Up @@ -190,7 +210,7 @@ async fn main() -> eyre::Result<()> {
.inspect_err(|err| error!(%err, "Metrics server failed"));
});
tokio::spawn(async move {
let _ = ethlambda_rpc::start_api_server(api_socket, store)
let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator)
.await
.inspect_err(|err| error!(%err, "API server failed"));
});
Expand Down
38 changes: 28 additions & 10 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ethlambda_state_transition::is_proposer;
use ethlambda_storage::{ALL_TABLES, Store};
use ethlambda_types::{
ShortRoot,
aggregator::AggregatorController,
attestation::{SignedAggregatedAttestation, SignedAttestation},
block::{BlockSignatures, SignedBlock},
primitives::{H256, HashTreeRoot as _},
Expand Down Expand Up @@ -50,9 +51,9 @@ impl BlockChain {
pub fn spawn(
store: Store,
validator_keys: HashMap<u64, ValidatorKeyPair>,
is_aggregator: bool,
aggregator: AggregatorController,
) -> BlockChain {
metrics::set_is_aggregator(is_aggregator);
metrics::set_is_aggregator(aggregator.is_enabled());
metrics::set_node_sync_status(metrics::SyncStatus::Idle);
let genesis_time = store.config().genesis_time;
let key_manager = key_manager::KeyManager::new(validator_keys);
Expand All @@ -61,7 +62,7 @@ impl BlockChain {
p2p: None,
key_manager,
pending_blocks: HashMap::new(),
is_aggregator,
aggregator,
pending_block_parents: HashMap::new(),
current_aggregation: None,
}
Expand Down Expand Up @@ -104,7 +105,11 @@ pub struct BlockChainServer {
pending_block_parents: HashMap<H256, H256>,

/// Whether this node acts as a committee aggregator.
is_aggregator: bool,
///
/// Read fresh on every tick and gossip event so runtime toggles via the
/// admin API take effect without a restart. Seeded from the CLI
/// `--is-aggregator` flag at spawn.
aggregator: AggregatorController,

/// In-flight committee-signature aggregation, if any. Present only while a
/// worker started at the most recent interval 2 is still running or until
Expand All @@ -131,6 +136,13 @@ impl BlockChainServer {
// Update current slot metric
metrics::update_current_slot(slot);

// Snapshot the aggregator flag once per tick so all read sites within
// the tick see a consistent value even if the admin API toggles it
// mid-tick. Mirror it to the gauge from the actor side so
// `lean_is_aggregator` reflects the value the actor is acting on.
let is_aggregator = self.aggregator.is_enabled();
metrics::set_is_aggregator(is_aggregator);

// At interval 0, check if we will propose (but don't build the block yet).
// Tick forkchoice first to accept attestations, then build the block
// using the freshly-accepted attestations.
Expand All @@ -145,7 +157,7 @@ impl BlockChainServer {
proposer_validator_id.is_some(),
);

if interval == 2 && self.is_aggregator {
if interval == 2 && is_aggregator {
self.start_aggregation_session(slot, ctx).await;
}

Expand All @@ -154,9 +166,11 @@ impl BlockChainServer {
self.propose_block(slot, validator_id);
}

// Produce attestations at interval 1 (all validators including proposer)
// Produce attestations at interval 1 (all validators including proposer).
// Reuse the same snapshot so self-delivery decisions match the rest
// of the tick.
if interval == 1 {
self.produce_attestations(slot);
self.produce_attestations(slot, is_aggregator);
}

// Update safe target slot metric (updated by store.on_tick at interval 3)
Expand Down Expand Up @@ -232,7 +246,7 @@ impl BlockChainServer {
.find(|&vid| is_proposer(vid, slot, num_validators))
}

fn produce_attestations(&mut self, slot: u64) {
fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) {
let _timing = metrics::time_attestations_production();

// Produce attestation data once for all validators
Expand Down Expand Up @@ -262,7 +276,7 @@ impl BlockChainServer {
// Gossipsub does not deliver messages back to the sender, so without
// this the aggregator never sees its own validator's signature in
// gossip_signatures and it is excluded from aggregated proofs.
if self.is_aggregator {
if is_aggregator {
let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation, true)
.inspect_err(|err| {
warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed")
Expand Down Expand Up @@ -536,7 +550,11 @@ impl BlockChainServer {
}

fn on_gossip_attestation(&mut self, attestation: &SignedAttestation) {
let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator)
// Read fresh here too: a gossip event can arrive between ticks, and
// if the admin API just toggled, the first gossip after the toggle
// should already use the new value.
let is_aggregator = self.aggregator.is_enabled();
let _ = store::on_gossip_attestation(&mut self.store, attestation, is_aggregator)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}

Expand Down
26 changes: 26 additions & 0 deletions crates/common/types/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

/// Shared, runtime-mutable aggregator role flag.
#[derive(Clone, Debug)]
pub struct AggregatorController {
flag: Arc<AtomicBool>,
}

impl AggregatorController {
/// Construct a controller seeded with the CLI `--is-aggregator` value.
pub fn new(initial: bool) -> Self {
Self {
flag: Arc::new(AtomicBool::new(initial)),
}
}

pub fn is_enabled(&self) -> bool {
self.flag.load(Ordering::Relaxed)
}

/// Update the role and return the previous value.
pub fn set_enabled(&self, enabled: bool) -> bool {
self.flag.swap(enabled, Ordering::Relaxed)
}
}
Comment thread
MegaRedHand marked this conversation as resolved.
1 change: 1 addition & 0 deletions crates/common/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod aggregator;
pub mod attestation;
pub mod block;
pub mod checkpoint;
Expand Down
9 changes: 9 additions & 0 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ pub(crate) struct Behaviour {
}

/// Configuration for building the libp2p swarm.
///
/// INVARIANT: `is_aggregator` is consumed once during [`build_swarm`] to decide
/// subnet subscriptions and is NOT stored on [`P2PServer`]. Runtime toggles
/// of the aggregator role via the admin API (see
/// [`ethlambda_types::aggregator::AggregatorController`]) intentionally do
/// not resubscribe gossip subnets — this is the leanSpec PR #636 scope
/// limitation ("hot-standby model"). If a runtime reader is ever added on
/// the P2P side, it must consult the shared `AggregatorController` instead
/// of a bool captured here, or the runtime toggle will silently diverge.
pub struct SwarmConfig {
pub node_key: Vec<u8>,
pub bootnodes: Vec<Bootnode>,
Expand Down
Loading
Loading