Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions docs/graphman.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [Drop](#drop)
- [Chain Check Blocks](#check-blocks)
- [Chain Call Cache Remove](#chain-call-cache-remove)
- [Chain Rebuild Storage](#chain-rebuild-storage)

<a id="info"></a>
# ⌘ Info
Expand Down Expand Up @@ -439,3 +440,71 @@ Remove stale contracts from the call cache that have not been accessed in the la
Remove stale contracts from the call cache that have not been accessed in the last 7 days, limiting the removal to a maximum of 100 contracts:
graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 --ttl-max-contracts 100

<a id="chain-rebuild-storage"></a>
# ⌘ Chain Rebuild Storage

### SYNOPSIS

Rebuild a chain's storage schema and reset head metadata

If the storage schema is missing, rebuilds it silently.
If the storage already exists, prompts for confirmation before
dropping and rebuilding it (use --force to skip the prompt).

USAGE:
graphman --config <CONFIG> chain rebuild-storage [OPTIONS] <CHAIN_NAME>

ARGS:
<CHAIN_NAME> Chain name (must be an existing chain, see 'chain list')

OPTIONS:
-f, --force Skip confirmation prompt when storage already exists
-h, --help Print help information

### DESCRIPTION

The `chain rebuild-storage` command recovers from a situation where a chain's storage schema
(e.g. `chain42`) has been dropped or corrupted on the shard but the chain's metadata in
`public.chains` still exists. This can happen after manual database operations or partial failures.

> **Operational requirement:** Stop graph-node before running this command.

The command behaves differently depending on the state of the storage:

**Storage missing** (non-destructive): the command silently rebuilds the schema and resets
head metadata. No confirmation is required.

**Storage exists** (destructive): the command prompts for confirmation before dropping the
existing schema and rebuilding it from scratch. Use `--force` to skip the prompt.

In both cases, the command performs the following steps in a single transaction:

1. Drops the existing storage schema if present (`DROP SCHEMA ... CASCADE`).
2. Upserts the chain's row in `ethereum_networks` on the shard: inserts if missing, or repairs
identity metadata (`namespace`, `net_version`, `genesis_block_hash`) and resets head tracking
columns (`head_block_hash`, `head_block_number`, `head_block_cursor`) to `NULL` if the row exists.
3. Rebuilds the storage schema with empty `blocks`, `call_cache`, and `call_meta` tables.

After this, graph-node will treat the chain as freshly added and begin syncing from scratch.

The `public.chains` metadata row is never modified by this command.

### CONSTRAINTS

- The chain must already exist in `public.chains`. This command does not create new chains.
- Chains using shared storage (`public`) are not supported.

### EXAMPLES

Rebuild missing storage for a chain:

graphman --config config.toml chain rebuild-storage mainnet

Force-rebuild existing storage (skips confirmation):

graphman --config config.toml chain rebuild-storage mainnet --force

Check what chains are available:

graphman --config config.toml chain list

18 changes: 18 additions & 0 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,20 @@ pub enum ChainCommand {
/// The block number to ingest
number: BlockNumber,
},

/// Rebuild a chain's storage schema and reset head metadata.
///
/// If the storage schema is missing, rebuilds it silently.
/// If the storage already exists, prompts for confirmation before
/// dropping and rebuilding it (use --force to skip the prompt).
RebuildStorage {
/// Chain name (must be an existing chain, see 'chain list')
#[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())]
chain_name: String,
/// Skip confirmation prompt when storage already exists
#[clap(long, short)]
force: bool,
},
}

#[derive(Clone, Debug, Subcommand)]
Expand Down Expand Up @@ -1586,6 +1600,10 @@ async fn main() -> anyhow::Result<()> {
ctx.chain_store_and_adapter(&name).await?;
commands::chain::ingest(&logger, chain_store, ethereum_adapter, number).await
}
RebuildStorage { chain_name, force } => {
let (block_store, primary) = ctx.block_store_and_primary_pool().await;
commands::chain::rebuild_storage(primary, block_store, chain_name, force).await
}
}
}
Stats(cmd) => {
Expand Down
56 changes: 56 additions & 0 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use graph::components::store::StoreError;
use graph::prelude::BlockNumber;
use graph::prelude::ChainStore as _;
use graph::prelude::LightEthereumBlock;
use graph::prelude::anyhow::Context as _;
use graph::prelude::{anyhow, anyhow::bail};
use graph::slog::Logger;
use graph::{
Expand All @@ -27,11 +28,13 @@ use graph_store_postgres::ChainStore;
use graph_store_postgres::PoolCoordinator;
use graph_store_postgres::ScopedFutureExt;
use graph_store_postgres::Shard;
use graph_store_postgres::Storage;
use graph_store_postgres::add_chain;
use graph_store_postgres::find_chain;
use graph_store_postgres::update_chain_name;
use graph_store_postgres::{ConnectionPool, command_support::catalog::block_store};

use crate::manager::prompt::prompt_for_confirmation;
use crate::network_setup::Networks;

pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> {
Expand Down Expand Up @@ -329,3 +332,56 @@ pub async fn ingest(
}
Ok(())
}

pub async fn rebuild_storage(
primary: ConnectionPool,
store: BlockStore,
name: String,
force: bool,
) -> Result<(), Error> {
let mut conn = primary.get().await?;

let chain = block_store::find_chain(&mut conn, &name)
.await?
.ok_or_else(|| {
anyhow!(
"Chain {} not found in public.chains.\n\
This command only supports chains already present in metadata.",
name
)
})?;

if matches!(chain.storage, Storage::Shared) {
bail!(
"Chain {} uses shared storage public and cannot be rebuilt with this command.",
name
);
}

let namespace = chain.storage.to_string();
let shard = &chain.shard;
let ident = chain.network_identifier()?;

let drop_schema = store.has_namespace(&chain).await?;
if drop_schema {
let prompt = format!(
"Storage {namespace} for chain {name} already exists on shard {shard}.\n\
This will drop and rebuild chain storage. All cached blocks and call cache \
data in that namespace will be permanently deleted.\n\
Proceed?"
);
if !force && !prompt_for_confirmation(&prompt)? {
println!("Aborting.");
return Ok(());
}
}

store
.rebuild_chain_storage(&name, &ident, drop_schema)
.await
.with_context(|| format!("Failed to rebuild storage {namespace} for chain {name}"))?;

println!("Successfully rebuilt storage {namespace} for chain {name} on shard {shard}.");

Ok(())
}
24 changes: 24 additions & 0 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,30 @@ impl BlockStore {
Ok(())
}

pub async fn has_namespace(&self, chain: &primary::Chain) -> Result<bool, StoreError> {
let pool = self
.pools
.get(&chain.shard)
.ok_or_else(|| internal_error!("no pool for shard {}", chain.shard))?;
let nsp = crate::primary::Namespace::special(chain.storage.to_string());
let mut conn = pool.get_permitted().await?;
crate::catalog::has_namespace(&mut conn, &nsp).await
}

pub async fn rebuild_chain_storage(
&self,
chain: &str,
ident: &ChainIdentifier,
drop_schema: bool,
) -> Result<(), StoreError> {
let chain_store = self
.store(chain)
.await
.ok_or_else(|| internal_error!("No chain store found for {}", chain))?;

Ok(chain_store.rebuild_storage(ident, drop_schema).await?)
}

// Helper to clone the list of chain stores to avoid holding the lock
// while awaiting
fn stores(&self) -> Vec<Arc<ChainStore>> {
Expand Down
56 changes: 55 additions & 1 deletion store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use graph::parking_lot::RwLock;
use graph::prelude::MetricsRegistry;
use graph::prelude::alloy::primitives::B256;
use graph::prometheus::{CounterVec, GaugeVec};
use graph::slog::{Logger, info, o};
use graph::slog::{Logger, debug, info, o};
use graph::stable_hash::crypto_stable_hash;
use graph::util::herd_cache::HerdCache;

Expand Down Expand Up @@ -2479,6 +2479,60 @@ impl ChainStore {
.await
}

/// Drop the chain's storage schema (if it exists), reset head
/// metadata in `ethereum_networks`, and rebuild the schema with
/// empty tables. If the `ethereum_networks` row is missing, it is
/// created from the provided `ident`.
pub(crate) async fn rebuild_storage(
&self,
ident: &ChainIdentifier,
drop_schema: bool,
) -> Result<(), Error> {
use public::ethereum_networks as n;

let nsp = self.storage.to_string();

debug!(&self.logger, "Rebuilding storage for chain"; "chain" => &self.chain, "namespace" => &nsp);

let mut conn = self.pool.get_permitted().await?;
conn.transaction(|conn| {
async {
if drop_schema {
debug!(&self.logger, "Dropping existing schema"; "namespace" => &nsp);
self.storage.drop_storage(conn, &self.chain).await?;
}

debug!(&self.logger, "Upserting ethereum_networks row"; "chain" => &self.chain);
insert_into(n::table)
.values((
n::name.eq(&self.chain),
n::namespace.eq(&self.storage),
n::net_version.eq(&ident.net_version),
n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()),
))
.on_conflict(n::name)
.do_update()
.set((
n::namespace.eq(&self.storage),
n::net_version.eq(&ident.net_version),
n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()),
n::head_block_hash.eq(None::<String>),
n::head_block_number.eq(None::<i64>),
n::head_block_cursor.eq(None::<String>),
))
.execute(conn)
.await?;

debug!(&self.logger, "Creating storage schema and tables"; "namespace" => &nsp);
self.storage.create(conn).await?;

Ok(())
}
.scope_boxed()
})
.await
}

pub async fn chain_head_pointers(
conn: &mut AsyncPgConnection,
) -> Result<HashMap<String, BlockPtr>, StoreError> {
Expand Down
Loading
Loading