feat(op): cmd init at block (#7784)

Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Emilia Hane
2024-04-25 22:31:01 +02:00
committed by GitHub
parent e2e5201d8a
commit adf1d25a89
10 changed files with 391 additions and 18 deletions

2
Cargo.lock generated
View File

@ -6118,6 +6118,7 @@ dependencies = [
"serde_json",
"similar-asserts",
"tempfile",
"thiserror",
"tikv-jemallocator",
"tokio",
"toml",
@ -6995,6 +6996,7 @@ dependencies = [
"reth-tasks",
"reth-tracing",
"reth-transaction-pool",
"reth-trie",
"secp256k1",
"serde",
"serde_json",

View File

@ -104,6 +104,7 @@ itertools.workspace = true
rayon.workspace = true
boyer-moore-magiclen = "0.2.16"
ahash = "0.8"
thiserror.workspace = true
# p2p
discv5.workspace = true

View File

@ -6,8 +6,8 @@ use crate::{
LogArgs,
},
commands::{
config_cmd, db, debug_cmd, dump_genesis, import, init_cmd, node, node::NoArgs, p2p,
recover, stage, test_vectors,
config_cmd, db, debug_cmd, dump_genesis, import, init_cmd, init_state, node, node::NoArgs,
p2p, recover, stage, test_vectors,
},
version::{LONG_VERSION, SHORT_VERSION},
};
@ -145,6 +145,7 @@ impl<Ext: clap::Args + fmt::Debug> Cli<Ext> {
runner.run_command_until_exit(|ctx| command.execute(ctx, launcher))
}
Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()),
@ -176,6 +177,9 @@ pub enum Commands<Ext: clap::Args + fmt::Debug = NoArgs> {
/// Initialize the database from a genesis file.
#[command(name = "init")]
Init(init_cmd::InitCommand),
/// Initialize the database from a state dump file.
#[command(name = "init-state")]
InitState(init_state::InitStateCommand),
/// This syncs RLP encoded blocks from a file.
#[command(name = "import")]
Import(import::ImportCommand),

View File

@ -0,0 +1,107 @@
//! Command that initializes the node from a genesis file.
use crate::{
args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
},
dirs::{DataDirPath, MaybePlatformPath},
};
use clap::Parser;
use reth_db::{database::Database, init_db};
use reth_node_core::init::{init_from_state_dump, init_genesis};
use reth_primitives::{ChainSpec, B256};
use reth_provider::ProviderFactory;
use std::{fs::File, io::BufReader, path::PathBuf, sync::Arc};
use tracing::info;
/// Initializes the database with the genesis block.
#[derive(Debug, Parser)]
pub struct InitStateCommand {
/// The path to the data dir for all reth files and subdirectories.
///
/// Defaults to the OS-specific data directory:
///
/// - Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/`
/// - Windows: `{FOLDERID_RoamingAppData}/reth/`
/// - macOS: `$HOME/Library/Application Support/reth/`
#[arg(long, value_name = "DATA_DIR", verbatim_doc_comment, default_value_t)]
datadir: MaybePlatformPath<DataDirPath>,
/// The chain this node is running.
///
/// Possible values are either a built-in chain or the path to a chain specification file.
#[arg(
long,
value_name = "CHAIN_OR_PATH",
long_help = chain_help(),
default_value = SUPPORTED_CHAINS[0],
value_parser = genesis_value_parser
)]
chain: Arc<ChainSpec>,
/// JSONL file with state dump.
///
/// Must contain accounts in following format, additional account fields are ignored. Can
/// also contain { "root": \<state-root\> } as first line.
/// {
/// "balance": "\<balance\>",
/// "nonce": \<nonce\>,
/// "code": "\<bytecode\>",
/// "storage": {
/// "\<key\>": "\<value\>",
/// ..
/// },
/// "address": "\<address\>",
/// }
///
/// Allows init at a non-genesis block. Caution! Blocks must be manually imported up until
/// and including the non-genesis block to init chain at. See 'import' command.
#[arg(long, value_name = "STATE_DUMP_FILE", verbatim_doc_comment, default_value = None)]
state: Option<PathBuf>,
#[command(flatten)]
db: DatabaseArgs,
}
impl InitStateCommand {
/// Execute the `init` command
pub async fn execute(self) -> eyre::Result<()> {
info!(target: "reth::cli", "reth init starting");
// add network name to data dir
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db_path();
info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(&db_path, self.db.database_args())?);
info!(target: "reth::cli", "Database opened");
let provider_factory = ProviderFactory::new(db, self.chain, data_dir.static_files_path())?;
info!(target: "reth::cli", "Writing genesis block");
let hash = match self.state {
Some(path) => init_at_state(path, provider_factory)?,
None => init_genesis(provider_factory)?,
};
info!(target: "reth::cli", hash = ?hash, "Genesis block written");
Ok(())
}
}
/// Initialize chain with state at specific block, from a file with state dump.
pub fn init_at_state<DB: Database>(
state_dump_path: PathBuf,
factory: ProviderFactory<DB>,
) -> eyre::Result<B256> {
info!(target: "reth::cli",
path=?state_dump_path,
"Opening state dump");
let file = File::open(state_dump_path)?;
let reader = BufReader::new(file);
init_from_state_dump(reader, factory)
}

View File

@ -7,6 +7,7 @@ pub mod dump_genesis;
pub mod import;
pub mod init_cmd;
pub mod init_state;
pub mod node;
pub mod p2p;

View File

@ -32,6 +32,7 @@ reth-network-api.workspace = true
reth-evm.workspace = true
reth-engine-primitives.workspace = true
reth-tasks.workspace = true
reth-trie.workspace = true
reth-consensus-common.workspace = true
reth-beacon-consensus.workspace = true
@ -71,7 +72,11 @@ hyper.workspace = true
tracing.workspace = true
# crypto
secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] }
secp256k1 = { workspace = true, features = [
"global-context",
"rand-std",
"recovery",
] }
# async
futures.workspace = true

View File

@ -13,14 +13,36 @@ use reth_primitives::{
use reth_provider::{
bundle_state::{BundleStateInit, RevertsInit},
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BundleStateWithReceipts, ChainSpecProvider, DatabaseProviderRW, HashingWriter,
HistoryWriter, OriginalValuesKnown, ProviderError, ProviderFactory,
BlockHashReader, BlockNumReader, BundleStateWithReceipts, ChainSpecProvider,
DatabaseProviderRW, HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError,
ProviderFactory,
};
use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress};
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, HashMap},
io::BufRead,
ops::DerefMut,
sync::Arc,
};
use tracing::debug;
use tracing::{debug, error, info, trace};
/// Default soft limit for number of bytes to read from state dump file, before inserting into
/// database.
///
/// Default is 1 GB.
pub const DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK: usize = 1_000_000_000;
/// Approximate number of accounts per 1 GB of state dump file. One account is approximately 3.5 KB
///
/// Approximate is 285 228 accounts.
//
// (14.05 GB OP mainnet state dump at Bedrock block / 4 007 565 accounts in file > 3.5 KB per
// account)
pub const AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP: usize = 285_228;
/// Soft limit for the number of flushed updates after which to log progress summary.
const SOFT_LIMIT_COUNT_FLUSHED_UPDATES: usize = 1_000_000;
/// Database initialization error type.
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
@ -34,10 +56,19 @@ pub enum InitDatabaseError {
/// Actual genesis hash.
database_hash: B256,
},
/// Provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
/// Computed state root doesn't match state root in state dump file.
#[error(
"state root mismatch, state dump: {expected_state_root}, computed: {computed_state_root}"
)]
SateRootMismatch {
/// Expected state root.
expected_state_root: B256,
/// Actual state root.
computed_state_root: B256,
},
}
impl From<DatabaseError> for InitDatabaseError {
@ -102,6 +133,16 @@ pub fn insert_genesis_state<'a, 'b, DB: Database>(
tx: &<DB as Database>::TXMut,
capacity: usize,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
) -> ProviderResult<()> {
insert_state::<DB>(tx, capacity, alloc, 0)
}
/// Inserts state at given block into database.
pub fn insert_state<'a, 'b, DB: Database>(
tx: &<DB as Database>::TXMut,
capacity: usize,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
block: u64,
) -> ProviderResult<()> {
let mut state_init: BundleStateInit = HashMap::with_capacity(capacity);
let mut reverts_init = HashMap::with_capacity(capacity);
@ -149,18 +190,20 @@ pub fn insert_genesis_state<'a, 'b, DB: Database>(
),
);
}
let all_reverts_init: RevertsInit = HashMap::from([(0, reverts_init)]);
let all_reverts_init: RevertsInit = HashMap::from([(block, reverts_init)]);
let bundle = BundleStateWithReceipts::new_init(
state_init,
all_reverts_init,
contracts.into_iter().collect(),
Receipts::new(),
0,
block,
);
bundle.write_to_storage(tx, None, OriginalValuesKnown::Yes)?;
trace!(target: "reth::cli", "Inserted state");
Ok(())
}
@ -174,6 +217,8 @@ pub fn insert_genesis_hashes<'a, 'b, DB: Database>(
alloc.clone().map(|(addr, account)| (*addr, Some(Account::from_genesis_account(account))));
provider.insert_account_for_hashing(alloc_accounts)?;
trace!(target: "reth::cli", "Inserted account hashes");
let alloc_storage = alloc.filter_map(|(addr, account)| {
// only return Some if there is storage
account.storage.as_ref().map(|storage| {
@ -188,6 +233,8 @@ pub fn insert_genesis_hashes<'a, 'b, DB: Database>(
});
provider.insert_storage_for_hashing(alloc_storage)?;
trace!(target: "reth::cli", "Inserted storage hashes");
Ok(())
}
@ -195,17 +242,30 @@ pub fn insert_genesis_hashes<'a, 'b, DB: Database>(
pub fn insert_genesis_history<'a, 'b, DB: Database>(
provider: &DatabaseProviderRW<DB>,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
) -> ProviderResult<()> {
insert_history::<DB>(provider, alloc, 0)
}
/// Inserts history indices for genesis accounts and storage.
pub fn insert_history<'a, 'b, DB: Database>(
provider: &DatabaseProviderRW<DB>,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
block: u64,
) -> ProviderResult<()> {
let account_transitions =
alloc.clone().map(|(addr, _)| (*addr, vec![0])).collect::<BTreeMap<_, _>>();
alloc.clone().map(|(addr, _)| (*addr, vec![block])).collect::<BTreeMap<_, _>>();
provider.insert_account_history_index(account_transitions)?;
trace!(target: "reth::cli", "Inserted account history");
let storage_transitions = alloc
.filter_map(|(addr, account)| account.storage.as_ref().map(|storage| (addr, storage)))
.flat_map(|(addr, storage)| storage.iter().map(|(key, _)| ((*addr, *key), vec![0])))
.flat_map(|(addr, storage)| storage.iter().map(|(key, _)| ((*addr, *key), vec![block])))
.collect::<BTreeMap<_, _>>();
provider.insert_storage_history_index(storage_transitions)?;
trace!(target: "reth::cli", "Inserted storage history");
Ok(())
}
@ -233,6 +293,182 @@ pub fn insert_genesis_header<DB: Database>(
Ok(())
}
/// Initialize chain with state at specific block, from reader of state dump.
pub fn init_from_state_dump<DB: Database>(
mut reader: impl BufRead,
factory: ProviderFactory<DB>,
) -> eyre::Result<B256> {
let block = factory.last_block_number()?;
let hash = factory.block_hash(block)?.unwrap();
debug!(target: "reth::cli",
block,
chain=%factory.chain_spec().chain,
"Initializing state at block"
);
let mut total_inserted_accounts = 0;
let mut accounts = Vec::with_capacity(AVERAGE_COUNT_ACCOUNTS_PER_GB_STATE_DUMP);
let mut chunk_total_byte_len = 0;
let mut line = String::new();
// first line can be state root, then it can be used for verifying against computed state root
reader.read_line(&mut line)?;
let expected_state_root = serde_json::from_str::<StateRoot>(&line)?.root;
trace!(target: "reth::cli",
root=%expected_state_root,
"Read state root from file"
);
line.clear();
// remaining lines are accounts
let mut provider_rw = factory.provider_rw()?;
while let Ok(n) = reader.read_line(&mut line) {
chunk_total_byte_len += n;
if DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK <= chunk_total_byte_len || n == 0 {
// acc
total_inserted_accounts += accounts.len();
info!(target: "reth::cli",
chunk_total_byte_len,
parsed_new_accounts=accounts.len(),
total_inserted_accounts,
"Writing accounts to db"
);
// reset
chunk_total_byte_len = 0;
// use transaction to insert genesis header
insert_genesis_hashes(
&provider_rw,
accounts.iter().map(|(address, account)| (address, account)),
)?;
insert_history(
&provider_rw,
accounts.iter().map(|(address, account)| (address, account)),
block,
)?;
// block is already written to static files
let tx = provider_rw.deref_mut().tx_mut();
insert_state::<DB>(
tx,
accounts.len(),
accounts.iter().map(|(address, account)| (address, account)),
block,
)?;
accounts.clear();
}
if n == 0 {
break;
}
let GenesisAccountWithAddress { genesis_account, address } = serde_json::from_str(&line)?;
accounts.push((address, genesis_account));
line.clear();
}
// compute and compare state root. this advances the stage checkpoints.
let computed_state_root = compute_state_root(&provider_rw)?;
if computed_state_root != expected_state_root {
error!(target: "reth::cli",
?computed_state_root,
?expected_state_root,
"Computed state root does not match state root in state dump"
);
Err(InitDatabaseError::SateRootMismatch { expected_state_root, computed_state_root })?
} else {
info!(target: "reth::cli",
?computed_state_root,
"Computed state root matches state root in state dump"
);
}
provider_rw.commit()?;
Ok(hash)
}
/// Computes the state root (from scratch) based on the accounts and storages present in the
/// database.
fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::Result<B256> {
trace!(target: "reth::cli", "Computing state root");
let tx = provider.tx_ref();
let mut intermediate_state: Option<IntermediateStateRootState> = None;
let mut total_flushed_updates = 0;
loop {
match StateRootComputer::from_tx(tx)
.with_intermediate_state(intermediate_state)
.root_with_progress()?
{
StateRootProgress::Progress(state, _, updates) => {
let updates_len = updates.len();
trace!(target: "reth::cli",
last_account_key = %state.last_account_key,
updates_len,
total_flushed_updates,
"Flushing trie updates"
);
intermediate_state = Some(*state);
updates.flush(tx)?;
total_flushed_updates += updates_len;
if total_flushed_updates % SOFT_LIMIT_COUNT_FLUSHED_UPDATES == 0 {
info!(target: "reth::cli",
total_flushed_updates,
"Flushing trie updates"
);
}
}
StateRootProgress::Complete(root, _, updates) => {
let updates_len = updates.len();
updates.flush(tx)?;
total_flushed_updates += updates_len;
trace!(target: "reth::cli",
%root,
updates_len = updates_len,
total_flushed_updates,
"State root has been computed"
);
return Ok(root)
}
}
}
}
/// Type to deserialize state root from state dump file.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct StateRoot {
root: B256,
}
/// An account as in the state dump file. This contains a [`GenesisAccount`] and the account's
/// address.
#[derive(Debug, Serialize, Deserialize)]
struct GenesisAccountWithAddress {
/// The account's balance, nonce, code, and storage.
#[serde(flatten)]
genesis_account: GenesisAccount,
/// The account's address.
address: Address,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -297,8 +297,8 @@ impl BundleStateWithReceipts {
/// files if `static_file_producer` is `Some`. It should be none if there is any kind of
/// pruning/filtering over the receipts.
///
/// `omit_changed_check` should be set to true of bundle has some of it data
/// detached, This would make some original values not known.
/// `omit_changed_check` should be set to true if bundle has some of its data detached. This
/// would make some original values not known.
pub fn write_to_storage<TX>(
self,
tx: &TX,

View File

@ -77,6 +77,7 @@ impl StateChanges {
}
}
}
Ok(())
}
}

View File

@ -1,6 +1,6 @@
use rayon::slice::ParallelSliceMut;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO, DbDupCursorRW},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
models::{AccountBeforeTx, BlockNumberAddress},
tables,
transaction::{DbTx, DbTxMut},
@ -74,15 +74,31 @@ impl StateReverts {
// Write account changes
tracing::trace!(target: "provider::reverts", "Writing account changes");
let mut account_changeset_cursor = tx.cursor_dup_write::<tables::AccountChangeSets>()?;
// append entries if key is new
let should_append_accounts =
account_changeset_cursor.last()?.map_or(true, |(block_number, _)| {
block_number < first_block || block_number == first_block && block_number == 0
});
for (block_index, mut account_block_reverts) in self.0.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
// Sort accounts by address.
account_block_reverts.par_sort_by_key(|a| a.0);
for (address, info) in account_block_reverts {
if should_append_accounts {
account_changeset_cursor.append_dup(
block_number,
AccountBeforeTx { address, info: info.map(into_reth_acc) },
)?;
} else {
// upsert on dupsort tables will append to subkey. see implementation of
// DbCursorRW::upsert for reth_db::implementation::mdbx::cursor::Cursor<RW, _>
account_changeset_cursor.upsert(
block_number,
AccountBeforeTx { address, info: info.map(into_reth_acc) },
)?;
}
}
}