diff --git a/Cargo.lock b/Cargo.lock index 901d836cd..edc487bad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6118,6 +6118,7 @@ dependencies = [ "serde_json", "similar-asserts", "tempfile", + "thiserror", "tikv-jemallocator", "tokio", "toml", @@ -6995,6 +6996,7 @@ dependencies = [ "reth-tasks", "reth-tracing", "reth-transaction-pool", + "reth-trie", "secp256k1", "serde", "serde_json", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index ff2515464..5e47506db 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -104,6 +104,7 @@ itertools.workspace = true rayon.workspace = true boyer-moore-magiclen = "0.2.16" ahash = "0.8" +thiserror.workspace = true # p2p discv5.workspace = true diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 34fd09456..9c81b0aec 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -6,8 +6,8 @@ use crate::{ LogArgs, }, commands::{ - config_cmd, db, debug_cmd, dump_genesis, import, init_cmd, node, node::NoArgs, p2p, - recover, stage, test_vectors, + config_cmd, db, debug_cmd, dump_genesis, import, init_cmd, init_state, node, node::NoArgs, + p2p, recover, stage, test_vectors, }, version::{LONG_VERSION, SHORT_VERSION}, }; @@ -145,6 +145,7 @@ impl Cli { runner.run_command_until_exit(|ctx| command.execute(ctx, launcher)) } Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()), + Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()), @@ -176,6 +177,9 @@ pub enum Commands { /// Initialize the database from a genesis file. #[command(name = "init")] Init(init_cmd::InitCommand), + /// Initialize the database from a state dump file. + #[command(name = "init-state")] + InitState(init_state::InitStateCommand), /// This syncs RLP encoded blocks from a file. #[command(name = "import")] Import(import::ImportCommand), diff --git a/bin/reth/src/commands/init_state.rs b/bin/reth/src/commands/init_state.rs new file mode 100644 index 000000000..c05f064b3 --- /dev/null +++ b/bin/reth/src/commands/init_state.rs @@ -0,0 +1,107 @@ +//! Command that initializes the node from a genesis file. + +use crate::{ + args::{ + utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS}, + DatabaseArgs, + }, + dirs::{DataDirPath, MaybePlatformPath}, +}; +use clap::Parser; +use reth_db::{database::Database, init_db}; +use reth_node_core::init::{init_from_state_dump, init_genesis}; +use reth_primitives::{ChainSpec, B256}; +use reth_provider::ProviderFactory; + +use std::{fs::File, io::BufReader, path::PathBuf, sync::Arc}; +use tracing::info; + +/// Initializes the database with the genesis block. +#[derive(Debug, Parser)] +pub struct InitStateCommand { + /// The path to the data dir for all reth files and subdirectories. + /// + /// Defaults to the OS-specific data directory: + /// + /// - Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/` + /// - Windows: `{FOLDERID_RoamingAppData}/reth/` + /// - macOS: `$HOME/Library/Application Support/reth/` + #[arg(long, value_name = "DATA_DIR", verbatim_doc_comment, default_value_t)] + datadir: MaybePlatformPath, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + #[arg( + long, + value_name = "CHAIN_OR_PATH", + long_help = chain_help(), + default_value = SUPPORTED_CHAINS[0], + value_parser = genesis_value_parser + )] + chain: Arc, + + /// JSONL file with state dump. + /// + /// Must contain accounts in following format, additional account fields are ignored. Can + /// also contain { "root": \ } as first line. + /// { + /// "balance": "\", + /// "nonce": \, + /// "code": "\", + /// "storage": { + /// "\": "\", + /// .. + /// }, + /// "address": "\", + /// } + /// + /// Allows init at a non-genesis block. Caution! Blocks must be manually imported up until + /// and including the non-genesis block to init chain at. See 'import' command. + #[arg(long, value_name = "STATE_DUMP_FILE", verbatim_doc_comment, default_value = None)] + state: Option, + + #[command(flatten)] + db: DatabaseArgs, +} + +impl InitStateCommand { + /// Execute the `init` command + pub async fn execute(self) -> eyre::Result<()> { + info!(target: "reth::cli", "reth init starting"); + + // add network name to data dir + let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); + let db_path = data_dir.db_path(); + info!(target: "reth::cli", path = ?db_path, "Opening database"); + let db = Arc::new(init_db(&db_path, self.db.database_args())?); + info!(target: "reth::cli", "Database opened"); + + let provider_factory = ProviderFactory::new(db, self.chain, data_dir.static_files_path())?; + + info!(target: "reth::cli", "Writing genesis block"); + + let hash = match self.state { + Some(path) => init_at_state(path, provider_factory)?, + None => init_genesis(provider_factory)?, + }; + + info!(target: "reth::cli", hash = ?hash, "Genesis block written"); + Ok(()) + } +} + +/// Initialize chain with state at specific block, from a file with state dump. +pub fn init_at_state( + state_dump_path: PathBuf, + factory: ProviderFactory, +) -> eyre::Result { + info!(target: "reth::cli", + path=?state_dump_path, + "Opening state dump"); + + let file = File::open(state_dump_path)?; + let reader = BufReader::new(file); + + init_from_state_dump(reader, factory) +} diff --git a/bin/reth/src/commands/mod.rs b/bin/reth/src/commands/mod.rs index 278531f71..03d5a8287 100644 --- a/bin/reth/src/commands/mod.rs +++ b/bin/reth/src/commands/mod.rs @@ -7,6 +7,7 @@ pub mod dump_genesis; pub mod import; pub mod init_cmd; +pub mod init_state; pub mod node; pub mod p2p; diff --git a/crates/node-core/Cargo.toml b/crates/node-core/Cargo.toml index d6df37f09..4bce2908d 100644 --- a/crates/node-core/Cargo.toml +++ b/crates/node-core/Cargo.toml @@ -32,6 +32,7 @@ reth-network-api.workspace = true reth-evm.workspace = true reth-engine-primitives.workspace = true reth-tasks.workspace = true +reth-trie.workspace = true reth-consensus-common.workspace = true reth-beacon-consensus.workspace = true @@ -71,7 +72,11 @@ hyper.workspace = true tracing.workspace = true # crypto -secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] } +secp256k1 = { workspace = true, features = [ + "global-context", + "rand-std", + "recovery", +] } # async futures.workspace = true diff --git a/crates/node-core/src/init.rs b/crates/node-core/src/init.rs index 7f529c2b0..eb513cc40 100644 --- a/crates/node-core/src/init.rs +++ b/crates/node-core/src/init.rs @@ -13,14 +13,36 @@ use reth_primitives::{ use reth_provider::{ bundle_state::{BundleStateInit, RevertsInit}, providers::{StaticFileProvider, StaticFileWriter}, - BlockHashReader, BundleStateWithReceipts, ChainSpecProvider, DatabaseProviderRW, HashingWriter, - HistoryWriter, OriginalValuesKnown, ProviderError, ProviderFactory, + BlockHashReader, BlockNumReader, BundleStateWithReceipts, ChainSpecProvider, + DatabaseProviderRW, HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError, + ProviderFactory, }; +use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress}; +use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, HashMap}, + io::BufRead, + ops::DerefMut, sync::Arc, }; -use tracing::debug; +use tracing::{debug, error, info, trace}; + +/// Default soft limit for number of bytes to read from state dump file, before inserting into +/// database. +/// +/// Default is 1 GB. +pub const DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK: usize = 1_000_000_000; + +/// Approximate number of accounts per 1 GB of state dump file. One account is approximately 3.5 KB +/// +/// Approximate is 285 228 accounts. +// +// (14.05 GB OP mainnet state dump at Bedrock block / 4 007 565 accounts in file > 3.5 KB per +// account) +pub const AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP: usize = 285_228; + +/// Soft limit for the number of flushed updates after which to log progress summary. +const SOFT_LIMIT_COUNT_FLUSHED_UPDATES: usize = 1_000_000; /// Database initialization error type. #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] @@ -34,10 +56,19 @@ pub enum InitDatabaseError { /// Actual genesis hash. database_hash: B256, }, - /// Provider error. #[error(transparent)] Provider(#[from] ProviderError), + /// Computed state root doesn't match state root in state dump file. + #[error( + "state root mismatch, state dump: {expected_state_root}, computed: {computed_state_root}" + )] + SateRootMismatch { + /// Expected state root. + expected_state_root: B256, + /// Actual state root. + computed_state_root: B256, + }, } impl From for InitDatabaseError { @@ -102,6 +133,16 @@ pub fn insert_genesis_state<'a, 'b, DB: Database>( tx: &::TXMut, capacity: usize, alloc: impl Iterator, +) -> ProviderResult<()> { + insert_state::(tx, capacity, alloc, 0) +} + +/// Inserts state at given block into database. +pub fn insert_state<'a, 'b, DB: Database>( + tx: &::TXMut, + capacity: usize, + alloc: impl Iterator, + block: u64, ) -> ProviderResult<()> { let mut state_init: BundleStateInit = HashMap::with_capacity(capacity); let mut reverts_init = HashMap::with_capacity(capacity); @@ -149,18 +190,20 @@ pub fn insert_genesis_state<'a, 'b, DB: Database>( ), ); } - let all_reverts_init: RevertsInit = HashMap::from([(0, reverts_init)]); + let all_reverts_init: RevertsInit = HashMap::from([(block, reverts_init)]); let bundle = BundleStateWithReceipts::new_init( state_init, all_reverts_init, contracts.into_iter().collect(), Receipts::new(), - 0, + block, ); bundle.write_to_storage(tx, None, OriginalValuesKnown::Yes)?; + trace!(target: "reth::cli", "Inserted state"); + Ok(()) } @@ -174,6 +217,8 @@ pub fn insert_genesis_hashes<'a, 'b, DB: Database>( alloc.clone().map(|(addr, account)| (*addr, Some(Account::from_genesis_account(account)))); provider.insert_account_for_hashing(alloc_accounts)?; + trace!(target: "reth::cli", "Inserted account hashes"); + let alloc_storage = alloc.filter_map(|(addr, account)| { // only return Some if there is storage account.storage.as_ref().map(|storage| { @@ -188,6 +233,8 @@ pub fn insert_genesis_hashes<'a, 'b, DB: Database>( }); provider.insert_storage_for_hashing(alloc_storage)?; + trace!(target: "reth::cli", "Inserted storage hashes"); + Ok(()) } @@ -195,17 +242,30 @@ pub fn insert_genesis_hashes<'a, 'b, DB: Database>( pub fn insert_genesis_history<'a, 'b, DB: Database>( provider: &DatabaseProviderRW, alloc: impl Iterator + Clone, +) -> ProviderResult<()> { + insert_history::(provider, alloc, 0) +} + +/// Inserts history indices for genesis accounts and storage. +pub fn insert_history<'a, 'b, DB: Database>( + provider: &DatabaseProviderRW, + alloc: impl Iterator + Clone, + block: u64, ) -> ProviderResult<()> { let account_transitions = - alloc.clone().map(|(addr, _)| (*addr, vec![0])).collect::>(); + alloc.clone().map(|(addr, _)| (*addr, vec![block])).collect::>(); provider.insert_account_history_index(account_transitions)?; + trace!(target: "reth::cli", "Inserted account history"); + let storage_transitions = alloc .filter_map(|(addr, account)| account.storage.as_ref().map(|storage| (addr, storage))) - .flat_map(|(addr, storage)| storage.iter().map(|(key, _)| ((*addr, *key), vec![0]))) + .flat_map(|(addr, storage)| storage.iter().map(|(key, _)| ((*addr, *key), vec![block]))) .collect::>(); provider.insert_storage_history_index(storage_transitions)?; + trace!(target: "reth::cli", "Inserted storage history"); + Ok(()) } @@ -233,6 +293,182 @@ pub fn insert_genesis_header( Ok(()) } +/// Initialize chain with state at specific block, from reader of state dump. +pub fn init_from_state_dump( + mut reader: impl BufRead, + factory: ProviderFactory, +) -> eyre::Result { + let block = factory.last_block_number()?; + let hash = factory.block_hash(block)?.unwrap(); + + debug!(target: "reth::cli", + block, + chain=%factory.chain_spec().chain, + "Initializing state at block" + ); + + let mut total_inserted_accounts = 0; + let mut accounts = Vec::with_capacity(AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP); + let mut chunk_total_byte_len = 0; + let mut line = String::new(); + + // first line can be state root, then it can be used for verifying against computed state root + reader.read_line(&mut line)?; + let expected_state_root = serde_json::from_str::(&line)?.root; + + trace!(target: "reth::cli", + root=%expected_state_root, + "Read state root from file" + ); + + line.clear(); + + // remaining lines are accounts + let mut provider_rw = factory.provider_rw()?; + while let Ok(n) = reader.read_line(&mut line) { + chunk_total_byte_len += n; + if DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK <= chunk_total_byte_len || n == 0 { + // acc + total_inserted_accounts += accounts.len(); + + info!(target: "reth::cli", + chunk_total_byte_len, + parsed_new_accounts=accounts.len(), + total_inserted_accounts, + "Writing accounts to db" + ); + + // reset + chunk_total_byte_len = 0; + + // use transaction to insert genesis header + insert_genesis_hashes( + &provider_rw, + accounts.iter().map(|(address, account)| (address, account)), + )?; + insert_history( + &provider_rw, + accounts.iter().map(|(address, account)| (address, account)), + block, + )?; + + // block is already written to static files + let tx = provider_rw.deref_mut().tx_mut(); + insert_state::( + tx, + accounts.len(), + accounts.iter().map(|(address, account)| (address, account)), + block, + )?; + + accounts.clear(); + } + + if n == 0 { + break; + } + + let GenesisAccountWithAddress { genesis_account, address } = serde_json::from_str(&line)?; + accounts.push((address, genesis_account)); + + line.clear(); + } + + // compute and compare state root. this advances the stage checkpoints. + let computed_state_root = compute_state_root(&provider_rw)?; + if computed_state_root != expected_state_root { + error!(target: "reth::cli", + ?computed_state_root, + ?expected_state_root, + "Computed state root does not match state root in state dump" + ); + + Err(InitDatabaseError::SateRootMismatch { expected_state_root, computed_state_root })? + } else { + info!(target: "reth::cli", + ?computed_state_root, + "Computed state root matches state root in state dump" + ); + } + + provider_rw.commit()?; + + Ok(hash) +} + +/// Computes the state root (from scratch) based on the accounts and storages present in the +/// database. +fn compute_state_root(provider: &DatabaseProviderRW) -> eyre::Result { + trace!(target: "reth::cli", "Computing state root"); + + let tx = provider.tx_ref(); + let mut intermediate_state: Option = None; + let mut total_flushed_updates = 0; + + loop { + match StateRootComputer::from_tx(tx) + .with_intermediate_state(intermediate_state) + .root_with_progress()? + { + StateRootProgress::Progress(state, _, updates) => { + let updates_len = updates.len(); + + trace!(target: "reth::cli", + last_account_key = %state.last_account_key, + updates_len, + total_flushed_updates, + "Flushing trie updates" + ); + + intermediate_state = Some(*state); + updates.flush(tx)?; + + total_flushed_updates += updates_len; + + if total_flushed_updates % SOFT_LIMIT_COUNT_FLUSHED_UPDATES == 0 { + info!(target: "reth::cli", + total_flushed_updates, + "Flushing trie updates" + ); + } + } + StateRootProgress::Complete(root, _, updates) => { + let updates_len = updates.len(); + + updates.flush(tx)?; + + total_flushed_updates += updates_len; + + trace!(target: "reth::cli", + %root, + updates_len = updates_len, + total_flushed_updates, + "State root has been computed" + ); + + return Ok(root) + } + } + } +} + +/// Type to deserialize state root from state dump file. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +struct StateRoot { + root: B256, +} + +/// An account as in the state dump file. This contains a [`GenesisAccount`] and the account's +/// address. +#[derive(Debug, Serialize, Deserialize)] +struct GenesisAccountWithAddress { + /// The account's balance, nonce, code, and storage. + #[serde(flatten)] + genesis_account: GenesisAccount, + /// The account's address. + address: Address, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs index 1153464f7..baf5fa597 100644 --- a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs +++ b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs @@ -297,8 +297,8 @@ impl BundleStateWithReceipts { /// files if `static_file_producer` is `Some`. It should be none if there is any kind of /// pruning/filtering over the receipts. /// - /// `omit_changed_check` should be set to true of bundle has some of it data - /// detached, This would make some original values not known. + /// `omit_changed_check` should be set to true if bundle has some of its data detached. This + /// would make some original values not known. pub fn write_to_storage( self, tx: &TX, diff --git a/crates/storage/provider/src/bundle_state/state_changes.rs b/crates/storage/provider/src/bundle_state/state_changes.rs index a62606ded..7f7bde79e 100644 --- a/crates/storage/provider/src/bundle_state/state_changes.rs +++ b/crates/storage/provider/src/bundle_state/state_changes.rs @@ -77,6 +77,7 @@ impl StateChanges { } } } + Ok(()) } } diff --git a/crates/storage/provider/src/bundle_state/state_reverts.rs b/crates/storage/provider/src/bundle_state/state_reverts.rs index 63c5595c5..e61572cf5 100644 --- a/crates/storage/provider/src/bundle_state/state_reverts.rs +++ b/crates/storage/provider/src/bundle_state/state_reverts.rs @@ -1,6 +1,6 @@ use rayon::slice::ParallelSliceMut; use reth_db::{ - cursor::{DbCursorRO, DbDupCursorRO, DbDupCursorRW}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, models::{AccountBeforeTx, BlockNumberAddress}, tables, transaction::{DbTx, DbTxMut}, @@ -74,15 +74,31 @@ impl StateReverts { // Write account changes tracing::trace!(target: "provider::reverts", "Writing account changes"); let mut account_changeset_cursor = tx.cursor_dup_write::()?; + + // append entries if key is new + let should_append_accounts = + account_changeset_cursor.last()?.map_or(true, |(block_number, _)| { + block_number < first_block || block_number == first_block && block_number == 0 + }); for (block_index, mut account_block_reverts) in self.0.accounts.into_iter().enumerate() { let block_number = first_block + block_index as BlockNumber; // Sort accounts by address. account_block_reverts.par_sort_by_key(|a| a.0); + for (address, info) in account_block_reverts { - account_changeset_cursor.append_dup( - block_number, - AccountBeforeTx { address, info: info.map(into_reth_acc) }, - )?; + if should_append_accounts { + account_changeset_cursor.append_dup( + block_number, + AccountBeforeTx { address, info: info.map(into_reth_acc) }, + )?; + } else { + // upsert on dupsort tables will append to subkey. see implementation of + // DbCursorRW::upsert for reth_db::implementation::mdbx::cursor::Cursor + account_changeset_cursor.upsert( + block_number, + AccountBeforeTx { address, info: info.map(into_reth_acc) }, + )?; + } } }