diff --git a/Cargo.lock b/Cargo.lock index 07cd06f18..556f8da69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6636,6 +6636,7 @@ dependencies = [ "humantime-serde", "reth-network-types", "reth-prune-types", + "reth-stages-types", "serde", "tempfile", "toml", @@ -7216,7 +7217,13 @@ version = "1.0.0" dependencies = [ "eyre", "metrics", + "reth-blockchain-tree", + "reth-chainspec", "reth-config", + "reth-db-api", + "reth-db-common", + "reth-evm", + "reth-evm-ethereum", "reth-exex-types", "reth-metrics", "reth-network", @@ -7225,8 +7232,13 @@ dependencies = [ "reth-payload-builder", "reth-primitives", "reth-provider", + "reth-prune-types", + "reth-revm", + "reth-stages-api", "reth-tasks", + "reth-testing-utils", "reth-tracing", + "secp256k1", "serde", "tokio", "tokio-util", diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 2df188c73..2172a2b4c 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -28,9 +28,8 @@ use reth_provider::{ }; use reth_prune::PruneModes; use reth_stages::{ - sets::DefaultStages, - stages::{ExecutionStage, ExecutionStageThresholds}, - Pipeline, StageId, StageSet, + sets::DefaultStages, stages::ExecutionStage, ExecutionStageThresholds, Pipeline, StageId, + StageSet, }; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; diff --git a/bin/reth/src/commands/stage/dump/merkle.rs b/bin/reth/src/commands/stage/dump/merkle.rs index 85fd0bfca..4e2541b60 100644 --- a/bin/reth/src/commands/stage/dump/merkle.rs +++ b/bin/reth/src/commands/stage/dump/merkle.rs @@ -12,10 +12,10 @@ use reth_provider::{providers::StaticFileProvider, ProviderFactory}; use reth_prune::PruneModes; use reth_stages::{ stages::{ - AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, - StorageHashingStage, MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, + AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage, + MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, }, - Stage, StageCheckpoint, UnwindInput, + ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput, }; use tracing::info; diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index b8dcc7c91..55824bd79 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -20,11 +20,11 @@ use reth_provider::{ }; use reth_stages::{ stages::{ - AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds, - IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, - StorageHashingStage, TransactionLookupStage, + AccountHashingStage, BodyStage, ExecutionStage, IndexAccountHistoryStage, + IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, + TransactionLookupStage, }, - ExecInput, ExecOutput, Stage, StageExt, UnwindInput, UnwindOutput, + ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput, }; use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant}; use tracing::*; diff --git a/bin/reth/src/commands/stage/unwind.rs b/bin/reth/src/commands/stage/unwind.rs index 57088eaf2..e3cb0cc8f 100644 --- a/bin/reth/src/commands/stage/unwind.rs +++ b/bin/reth/src/commands/stage/unwind.rs @@ -16,8 +16,8 @@ use reth_provider::{ use reth_prune::PruneModes; use reth_stages::{ sets::{DefaultStages, OfflineStages}, - stages::{ExecutionStage, ExecutionStageThresholds}, - Pipeline, StageSet, + stages::ExecutionStage, + ExecutionStageThresholds, Pipeline, StageSet, }; use reth_static_file::StaticFileProducer; use std::{ops::RangeInclusive, sync::Arc}; diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml index 1468fccd3..91dcfc772 100644 --- a/crates/config/Cargo.toml +++ b/crates/config/Cargo.toml @@ -14,6 +14,7 @@ workspace = true # reth reth-network-types = { workspace = true, features = ["serde"] } reth-prune-types.workspace = true +reth-stages-types.workspace = true # serde serde.workspace = true diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index c3ee8bf3d..fa498f66c 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -2,6 +2,7 @@ use reth_network_types::{PeersConfig, SessionsConfig}; use reth_prune_types::PruneModes; +use reth_stages_types::ExecutionStageThresholds; use serde::{Deserialize, Deserializer, Serialize}; use std::{ ffi::OsStr, @@ -216,6 +217,17 @@ impl Default for ExecutionConfig { } } +impl From for ExecutionStageThresholds { + fn from(config: ExecutionConfig) -> Self { + Self { + max_blocks: config.max_blocks, + max_changes: config.max_changes, + max_cumulative_gas: config.max_cumulative_gas, + max_duration: config.max_duration, + } + } +} + /// Hashing stage configuration. #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)] #[serde(default)] diff --git a/crates/evm/execution-types/src/chain.rs b/crates/evm/execution-types/src/chain.rs index 8833615bb..8ccf0f480 100644 --- a/crates/evm/execution-types/src/chain.rs +++ b/crates/evm/execution-types/src/chain.rs @@ -94,6 +94,11 @@ impl Chain { &self.execution_outcome } + /// Get mutable execution outcome of this chain + pub fn execution_outcome_mut(&mut self) -> &mut ExecutionOutcome { + &mut self.execution_outcome + } + /// Prepends the given state to the current state. pub fn prepend_state(&mut self, state: BundleState) { self.execution_outcome.prepend_state(state); diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 5bbf177d0..a911f4599 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -24,6 +24,11 @@ reth-tasks.workspace = true reth-tracing.workspace = true reth-network.workspace = true reth-payload-builder.workspace = true +reth-evm.workspace = true +reth-prune-types.workspace = true +reth-revm.workspace = true +reth-stages-api.workspace = true +reth-db-api.workspace = true ## async tokio.workspace = true @@ -34,6 +39,17 @@ eyre.workspace = true metrics.workspace = true serde = { workspace = true, optional = true } +[dev-dependencies] +reth-chainspec.workspace = true +reth-evm-ethereum.workspace = true +reth-testing-utils.workspace = true +reth-blockchain-tree.workspace = true +reth-db-common.workspace = true +reth-node-api.workspace = true +reth-provider = { workspace = true, features = ["test-utils"] } + +secp256k1.workspace = true + [features] default = [] serde = ["dep:serde", "reth-provider/serde"] diff --git a/crates/exex/exex/src/backfill.rs b/crates/exex/exex/src/backfill.rs new file mode 100644 index 000000000..f46c82d24 --- /dev/null +++ b/crates/exex/exex/src/backfill.rs @@ -0,0 +1,344 @@ +use reth_db_api::database::Database; +use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider}; +use reth_node_api::FullNodeComponents; +use reth_primitives::{Block, BlockNumber}; +use reth_provider::{Chain, FullProvider, ProviderError, TransactionVariant}; +use reth_prune_types::PruneModes; +use reth_revm::database::StateProviderDatabase; +use reth_stages_api::{format_gas_throughput, ExecutionStageThresholds}; +use reth_tracing::tracing::{debug, trace}; +use std::{ + marker::PhantomData, + ops::RangeInclusive, + time::{Duration, Instant}, +}; + +/// Factory for creating new backfill jobs. +#[derive(Debug, Clone)] +pub struct BackfillJobFactory { + executor: E, + provider: P, + prune_modes: PruneModes, + thresholds: ExecutionStageThresholds, +} + +impl BackfillJobFactory { + /// Creates a new [`BackfillJobFactory`]. + pub fn new(executor: E, provider: P, prune_modes: PruneModes) -> Self { + Self { executor, provider, prune_modes, thresholds: ExecutionStageThresholds::default() } + } + + /// Sets the thresholds + pub const fn with_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self { + self.thresholds = thresholds; + self + } +} + +impl BackfillJobFactory { + /// Creates a new backfill job for the given range. + pub fn backfill(&self, range: RangeInclusive) -> BackfillJob { + BackfillJob { + executor: self.executor.clone(), + provider: self.provider.clone(), + prune_modes: self.prune_modes.clone(), + range, + thresholds: self.thresholds.clone(), + _db: PhantomData, + } + } +} + +impl BackfillJobFactory<(), ()> { + /// Creates a new [`BackfillJobFactory`] from [`FullNodeComponents`]. + pub fn new_from_components( + components: Node, + prune_modes: PruneModes, + ) -> BackfillJobFactory { + BackfillJobFactory::<_, _>::new( + components.block_executor().clone(), + components.provider().clone(), + prune_modes, + ) + } +} + +/// Backfill job started for a specific range. +/// +/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds +/// and yields [`Chain`] +#[derive(Debug)] +pub struct BackfillJob { + executor: E, + provider: P, + prune_modes: PruneModes, + range: RangeInclusive, + thresholds: ExecutionStageThresholds, + _db: PhantomData, +} + +impl Iterator for BackfillJob +where + E: BlockExecutorProvider, + DB: Database, + P: FullProvider, +{ + type Item = Result; + + fn next(&mut self) -> Option { + if self.range.is_empty() { + return None + } + + Some(self.execute_range()) + } +} + +impl BackfillJob +where + E: BlockExecutorProvider, + DB: Database, + P: FullProvider, +{ + fn execute_range(&mut self) -> Result { + let mut executor = self.executor.batch_executor( + StateProviderDatabase::new( + self.provider.history_by_block_number(self.range.start().saturating_sub(1))?, + ), + self.prune_modes.clone(), + ); + + let mut fetch_block_duration = Duration::default(); + let mut execution_duration = Duration::default(); + let mut cumulative_gas = 0; + let batch_start = Instant::now(); + + let mut blocks = Vec::new(); + for block_number in self.range.clone() { + // Fetch the block + let fetch_block_start = Instant::now(); + + let td = self + .provider + .header_td_by_number(block_number)? + .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; + + // we need the block's transactions along with their hashes + let block = self + .provider + .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)? + .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; + + fetch_block_duration += fetch_block_start.elapsed(); + + cumulative_gas += block.gas_used; + + // Configure the executor to use the current state. + trace!(target: "exex::backfill", number = block_number, txs = block.body.len(), "Executing block"); + + // Execute the block + let execute_start = Instant::now(); + + // Unseal the block for execution + let (block, senders) = block.into_components(); + let (unsealed_header, hash) = block.header.split(); + let block = Block { + header: unsealed_header, + body: block.body, + ommers: block.ommers, + withdrawals: block.withdrawals, + requests: block.requests, + } + .with_senders_unchecked(senders); + + executor.execute_and_verify_one((&block, td).into())?; + execution_duration += execute_start.elapsed(); + + // TODO(alexey): report gas metrics using `block.header.gas_used` + + // Seal the block back and save it + blocks.push(block.seal(hash)); + + // Check if we should commit now + let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64; + if self.thresholds.is_end_of_batch( + block_number - *self.range.start(), + bundle_size_hint, + cumulative_gas, + batch_start.elapsed(), + ) { + break + } + } + + let last_block_number = blocks.last().expect("blocks should not be empty").number; + debug!( + target: "exex::backfill", + range = ?*self.range.start()..=last_block_number, + block_fetch = ?fetch_block_duration, + execution = ?execution_duration, + throughput = format_gas_throughput(cumulative_gas, execution_duration), + "Finished executing block range" + ); + self.range = last_block_number + 1..=*self.range.end(); + + let chain = Chain::new(blocks, executor.finalize(), None); + Ok(chain) + } +} + +#[cfg(test)] +mod tests { + use crate::BackfillJobFactory; + use eyre::OptionExt; + use reth_blockchain_tree::noop::NoopBlockchainTree; + use reth_chainspec::{ChainSpecBuilder, EthereumHardfork, MAINNET}; + use reth_db_common::init::init_genesis; + use reth_evm::execute::{BatchExecutor, BlockExecutorProvider}; + use reth_evm_ethereum::execute::EthExecutorProvider; + use reth_primitives::{ + b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, Genesis, + GenesisAccount, Header, Transaction, TxEip2930, TxKind, U256, + }; + use reth_provider::{ + providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, + BlockWriter, LatestStateProviderRef, + }; + use reth_prune_types::PruneModes; + use reth_revm::database::StateProviderDatabase; + use reth_testing_utils::generators::{self, sign_tx_with_key_pair}; + use secp256k1::Keypair; + use std::sync::Arc; + + #[tokio::test] + async fn test_backfill() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Create a key pair for the sender + let key_pair = Keypair::new_global(&mut generators::rng()); + let address = public_key_to_address(key_pair.public_key()); + + // Create a chain spec with a genesis state that contains the sender + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(Genesis { + alloc: [( + address, + GenesisAccount { balance: U256::from(ETH_TO_WEI), ..Default::default() }, + )] + .into(), + ..MAINNET.genesis.clone() + }) + .paris_activated() + .build(), + ); + + let executor = EthExecutorProvider::ethereum(chain_spec.clone()); + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); + init_genesis(provider_factory.clone())?; + let blockchain_db = BlockchainProvider::new( + provider_factory.clone(), + Arc::new(NoopBlockchainTree::default()), + )?; + + // First block has a transaction that transfers some ETH to zero address + let block1 = Block { + header: Header { + parent_hash: chain_spec.genesis_hash(), + receipts_root: b256!( + "d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e" + ), + difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"), + number: 1, + gas_limit: 21000, + gas_used: 21000, + ..Default::default() + }, + body: vec![sign_tx_with_key_pair( + key_pair, + Transaction::Eip2930(TxEip2930 { + chain_id: chain_spec.chain.id(), + nonce: 0, + gas_limit: 21000, + gas_price: 1_500_000_000, + to: TxKind::Call(Address::ZERO), + value: U256::from(0.1 * ETH_TO_WEI as f64), + ..Default::default() + }), + )], + ..Default::default() + } + .with_recovered_senders() + .ok_or_eyre("failed to recover senders")?; + + // Second block has no state changes + let block2 = Block { + header: Header { + parent_hash: block1.hash_slow(), + difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"), + number: 2, + ..Default::default() + }, + ..Default::default() + } + .with_recovered_senders() + .ok_or_eyre("failed to recover senders")?; + + let provider = provider_factory.provider()?; + // Execute only the first block on top of genesis state + let mut outcome_single = EthExecutorProvider::ethereum(chain_spec.clone()) + .batch_executor( + StateProviderDatabase::new(LatestStateProviderRef::new( + provider.tx_ref(), + provider.static_file_provider().clone(), + )), + PruneModes::none(), + ) + .execute_and_verify_batch([(&block1, U256::ZERO).into()])?; + outcome_single.bundle.reverts.sort(); + // Execute both blocks on top of the genesis state + let outcome_batch = EthExecutorProvider::ethereum(chain_spec) + .batch_executor( + StateProviderDatabase::new(LatestStateProviderRef::new( + provider.tx_ref(), + provider.static_file_provider().clone(), + )), + PruneModes::none(), + ) + .execute_and_verify_batch([ + (&block1, U256::ZERO).into(), + (&block2, U256::ZERO).into(), + ])?; + drop(provider); + + let block1 = block1.seal_slow(); + let block2 = block2.seal_slow(); + + // Update the state with the execution results of both blocks + let provider_rw = provider_factory.provider_rw()?; + provider_rw.append_blocks_with_state( + vec![block1.clone(), block2], + outcome_batch, + Default::default(), + Default::default(), + None, + )?; + provider_rw.commit()?; + + // Backfill the first block + let factory = BackfillJobFactory::new(executor, blockchain_db, PruneModes::none()); + let job = factory.backfill(1..=1); + let chains = job.collect::, _>>()?; + + // Assert that the backfill job produced the same chain as we got before when we were + // executing only the first block + assert_eq!(chains.len(), 1); + let mut chain = chains.into_iter().next().unwrap(); + chain.execution_outcome_mut().bundle.reverts.sort(); + assert_eq!(chain.blocks(), &[(1, block1)].into()); + assert_eq!(chain.execution_outcome(), &outcome_single); + + Ok(()) + } +} diff --git a/crates/exex/exex/src/lib.rs b/crates/exex/exex/src/lib.rs index a7661d855..5f859accc 100644 --- a/crates/exex/exex/src/lib.rs +++ b/crates/exex/exex/src/lib.rs @@ -34,6 +34,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(not(test), warn(unused_crate_dependencies))] +mod backfill; +pub use backfill::*; + mod context; pub use context::*; diff --git a/crates/exex/types/Cargo.toml b/crates/exex/types/Cargo.toml index 8797376da..e03b63342 100644 --- a/crates/exex/types/Cargo.toml +++ b/crates/exex/types/Cargo.toml @@ -12,4 +12,4 @@ description = "Commonly used types for exex usage in reth." workspace = true [dependencies] -alloy-primitives.workspace = true \ No newline at end of file +alloy-primitives.workspace = true diff --git a/crates/stages/api/src/metrics/execution.rs b/crates/stages/api/src/metrics/execution.rs new file mode 100644 index 000000000..b54ed02f6 --- /dev/null +++ b/crates/stages/api/src/metrics/execution.rs @@ -0,0 +1,20 @@ +use std::time::Duration; + +use reth_primitives_traits::constants::gas_units::{GIGAGAS, KILOGAS, MEGAGAS}; + +/// Returns a formatted gas throughput log, showing either: +/// * "Kgas/s", or 1,000 gas per second +/// * "Mgas/s", or 1,000,000 gas per second +/// * "Ggas/s", or 1,000,000,000 gas per second +/// +/// Depending on the magnitude of the gas throughput. +pub fn format_gas_throughput(gas: u64, execution_duration: Duration) -> String { + let gas_per_second = gas as f64 / execution_duration.as_secs_f64(); + if gas_per_second < MEGAGAS as f64 { + format!("{:.} Kgas/second", gas_per_second / KILOGAS as f64) + } else if gas_per_second < GIGAGAS as f64 { + format!("{:.} Mgas/second", gas_per_second / MEGAGAS as f64) + } else { + format!("{:.} Ggas/second", gas_per_second / GIGAGAS as f64) + } +} diff --git a/crates/stages/api/src/metrics/mod.rs b/crates/stages/api/src/metrics/mod.rs index bed2742c2..983247ae9 100644 --- a/crates/stages/api/src/metrics/mod.rs +++ b/crates/stages/api/src/metrics/mod.rs @@ -1,5 +1,7 @@ +mod execution; mod listener; mod sync_metrics; +pub use execution::format_gas_throughput; pub use listener::{MetricEvent, MetricEventsSender, MetricsListener}; use sync_metrics::*; diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index a37c081b4..26db6f50f 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -6,10 +6,7 @@ use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; use reth_evm::execute::{BatchExecutor, BlockExecutorProvider}; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_exex::{ExExManagerHandle, ExExNotification}; -use reth_primitives::{ - constants::gas_units::{GIGAGAS, KILOGAS, MEGAGAS}, - BlockNumber, Header, StaticFileSegment, -}; +use reth_primitives::{BlockNumber, Header, StaticFileSegment}; use reth_provider::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter}, BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, @@ -18,9 +15,9 @@ use reth_provider::{ use reth_prune_types::PruneModes; use reth_revm::database::StateProviderDatabase; use reth_stages_api::{ - BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput, - ExecutionCheckpoint, MetricEvent, MetricEventsSender, Stage, StageCheckpoint, StageError, - StageId, UnwindInput, UnwindOutput, + format_gas_throughput, BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, + ExecOutput, ExecutionCheckpoint, ExecutionStageThresholds, MetricEvent, MetricEventsSender, + Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, }; use std::{ cmp::Ordering, @@ -546,83 +543,6 @@ fn calculate_gas_used_from_headers( Ok(gas_total) } -/// The thresholds at which the execution stage writes state changes to the database. -/// -/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage -/// commits all pending changes to the database. -/// -/// A third threshold, `max_changesets`, can be set to periodically write changesets to the -/// current database transaction, which frees up memory. -#[derive(Debug, Clone)] -pub struct ExecutionStageThresholds { - /// The maximum number of blocks to execute before the execution stage commits. - pub max_blocks: Option, - /// The maximum number of state changes to keep in memory before the execution stage commits. - pub max_changes: Option, - /// The maximum cumulative amount of gas to process before the execution stage commits. - pub max_cumulative_gas: Option, - /// The maximum spent on blocks processing before the execution stage commits. - pub max_duration: Option, -} - -impl Default for ExecutionStageThresholds { - fn default() -> Self { - Self { - max_blocks: Some(500_000), - max_changes: Some(5_000_000), - // 50k full blocks of 30M gas - max_cumulative_gas: Some(30_000_000 * 50_000), - // 10 minutes - max_duration: Some(Duration::from_secs(10 * 60)), - } - } -} - -impl ExecutionStageThresholds { - /// Check if the batch thresholds have been hit. - #[inline] - pub fn is_end_of_batch( - &self, - blocks_processed: u64, - changes_processed: u64, - cumulative_gas_used: u64, - elapsed: Duration, - ) -> bool { - blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) || - changes_processed >= self.max_changes.unwrap_or(u64::MAX) || - cumulative_gas_used >= self.max_cumulative_gas.unwrap_or(u64::MAX) || - elapsed >= self.max_duration.unwrap_or(Duration::MAX) - } -} - -impl From for ExecutionStageThresholds { - fn from(config: ExecutionConfig) -> Self { - Self { - max_blocks: config.max_blocks, - max_changes: config.max_changes, - max_cumulative_gas: config.max_cumulative_gas, - max_duration: config.max_duration, - } - } -} - -/// Returns a formatted gas throughput log, showing either: -/// * "Kgas/s", or 1,000 gas per second -/// * "Mgas/s", or 1,000,000 gas per second -/// * "Ggas/s", or 1,000,000,000 gas per second -/// -/// Depending on the magnitude of the gas throughput. -pub fn format_gas_throughput(gas: u64, execution_duration: Duration) -> String { - let gas_per_second = gas as f64 / execution_duration.as_secs_f64(); - if gas_per_second < MEGAGAS as f64 { - format!("{:.} Kgas/second", gas_per_second / KILOGAS as f64) - } else if gas_per_second < GIGAGAS as f64 { - format!("{:.} Mgas/second", gas_per_second / MEGAGAS as f64) - } else { - format!("{:.} Ggas/second", gas_per_second / GIGAGAS as f64) - } -} - /// Returns a `StaticFileProviderRWRefMut` static file producer after performing a consistency /// check. /// @@ -720,7 +640,7 @@ mod tests { StaticFileProviderFactory, }; use reth_prune_types::{PruneMode, ReceiptsLogPruneConfig}; - use reth_stages_api::StageUnitCheckpoint; + use reth_stages_api::{format_gas_throughput, StageUnitCheckpoint}; use std::collections::BTreeMap; fn stage() -> ExecutionStage { diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 8d850b8ba..9c96f4b30 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -66,7 +66,9 @@ mod tests { StaticFileProviderFactory, StorageReader, }; use reth_prune_types::{PruneMode, PruneModes}; - use reth_stages_api::{ExecInput, PipelineTarget, Stage, StageCheckpoint, StageId}; + use reth_stages_api::{ + ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId, + }; use reth_testing_utils::generators::{self, random_block, random_block_range, random_receipt}; use std::{io::Write, sync::Arc}; diff --git a/crates/stages/types/src/execution.rs b/crates/stages/types/src/execution.rs new file mode 100644 index 000000000..61f7313a3 --- /dev/null +++ b/crates/stages/types/src/execution.rs @@ -0,0 +1,50 @@ +use std::time::Duration; + +/// The thresholds at which the execution stage writes state changes to the database. +/// +/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage +/// commits all pending changes to the database. +/// +/// A third threshold, `max_changesets`, can be set to periodically write changesets to the +/// current database transaction, which frees up memory. +#[derive(Debug, Clone)] +pub struct ExecutionStageThresholds { + /// The maximum number of blocks to execute before the execution stage commits. + pub max_blocks: Option, + /// The maximum number of state changes to keep in memory before the execution stage commits. + pub max_changes: Option, + /// The maximum cumulative amount of gas to process before the execution stage commits. + pub max_cumulative_gas: Option, + /// The maximum spent on blocks processing before the execution stage commits. + pub max_duration: Option, +} + +impl Default for ExecutionStageThresholds { + fn default() -> Self { + Self { + max_blocks: Some(500_000), + max_changes: Some(5_000_000), + // 50k full blocks of 30M gas + max_cumulative_gas: Some(30_000_000 * 50_000), + // 10 minutes + max_duration: Some(Duration::from_secs(10 * 60)), + } + } +} + +impl ExecutionStageThresholds { + /// Check if the batch thresholds have been hit. + #[inline] + pub fn is_end_of_batch( + &self, + blocks_processed: u64, + changes_processed: u64, + cumulative_gas_used: u64, + elapsed: Duration, + ) -> bool { + blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) || + changes_processed >= self.max_changes.unwrap_or(u64::MAX) || + cumulative_gas_used >= self.max_cumulative_gas.unwrap_or(u64::MAX) || + elapsed >= self.max_duration.unwrap_or(Duration::MAX) + } +} diff --git a/crates/stages/types/src/lib.rs b/crates/stages/types/src/lib.rs index 00355b023..0132c8b41 100644 --- a/crates/stages/types/src/lib.rs +++ b/crates/stages/types/src/lib.rs @@ -19,6 +19,9 @@ pub use checkpoints::{ StageUnitCheckpoint, StorageHashingCheckpoint, }; +mod execution; +pub use execution::*; + /// Direction and target block for pipeline operations. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PipelineTarget { diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 31332377d..9c65103a4 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -330,6 +330,14 @@ impl BlockReader for ProviderFactory { self.provider()?.block_with_senders(id, transaction_kind) } + fn sealed_block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult> { + self.provider()?.sealed_block_with_senders(id, transaction_kind) + } + fn block_range(&self, range: RangeInclusive) -> ProviderResult> { self.provider()?.block_range(range) } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index e5fa31642..32b666c50 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -357,6 +357,65 @@ impl DatabaseProvider { ) } + fn block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + header_by_number: HF, + construct_block: BF, + ) -> ProviderResult> + where + H: AsRef
, + HF: FnOnce(BlockNumber) -> ProviderResult>, + BF: FnOnce( + H, + Vec, + Vec
, + Vec
, + Option, + Option, + ) -> ProviderResult>, + { + let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) }; + let Some(header) = header_by_number(block_number)? else { return Ok(None) }; + + let ommers = self.ommers(block_number.into())?.unwrap_or_default(); + let withdrawals = + self.withdrawals_by_block(block_number.into(), header.as_ref().timestamp)?; + let requests = self.requests_by_block(block_number.into(), header.as_ref().timestamp)?; + + // Get the block body + // + // If the body indices are not found, this means that the transactions either do not exist + // in the database yet, or they do exit but are not indexed. If they exist but are not + // indexed, we don't have enough information to return the block anyways, so we return + // `None`. + let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) }; + + let tx_range = body.tx_num_range(); + + let (transactions, senders) = if tx_range.is_empty() { + (vec![], vec![]) + } else { + (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?) + }; + + let body = transactions + .into_iter() + .map(|tx| match transaction_kind { + TransactionVariant::NoHash => TransactionSigned { + // Caller explicitly asked for no hash, so we don't calculate it + hash: B256::ZERO, + signature: tx.signature, + transaction: tx.transaction, + }, + TransactionVariant::WithHash => tx.with_hash(), + }) + .collect(); + + construct_block(header, body, senders, ommers, withdrawals, requests) + } + /// Returns a range of blocks from the database. /// /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to @@ -1550,48 +1609,41 @@ impl BlockReader for DatabaseProvider { id: BlockHashOrNumber, transaction_kind: TransactionVariant, ) -> ProviderResult> { - let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) }; - let Some(header) = self.header_by_number(block_number)? else { return Ok(None) }; + self.block_with_senders( + id, + transaction_kind, + |block_number| self.header_by_number(block_number), + |header, body, senders, ommers, withdrawals, requests| { + Block { header, body, ommers, withdrawals, requests } + // Note: we're using unchecked here because we know the block contains valid txs + // wrt to its height and can ignore the s value check so pre + // EIP-2 txs are allowed + .try_with_senders_unchecked(senders) + .map(Some) + .map_err(|_| ProviderError::SenderRecoveryError) + }, + ) + } - let ommers = self.ommers(block_number.into())?.unwrap_or_default(); - let withdrawals = self.withdrawals_by_block(block_number.into(), header.timestamp)?; - let requests = self.requests_by_block(block_number.into(), header.timestamp)?; - - // Get the block body - // - // If the body indices are not found, this means that the transactions either do not exist - // in the database yet, or they do exit but are not indexed. If they exist but are not - // indexed, we don't have enough information to return the block anyways, so we return - // `None`. - let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) }; - - let tx_range = body.tx_num_range(); - - let (transactions, senders) = if tx_range.is_empty() { - (vec![], vec![]) - } else { - (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?) - }; - - let body = transactions - .into_iter() - .map(|tx| match transaction_kind { - TransactionVariant::NoHash => TransactionSigned { - // Caller explicitly asked for no hash, so we don't calculate it - hash: B256::ZERO, - signature: tx.signature, - transaction: tx.transaction, - }, - TransactionVariant::WithHash => tx.with_hash(), - }) - .collect(); - - Block { header, body, ommers, withdrawals, requests } - // Note: we're using unchecked here because we know the block contains valid txs wrt to - // its height and can ignore the s value check so pre EIP-2 txs are allowed - .try_with_senders_unchecked(senders) - .map(Some) - .map_err(|_| ProviderError::SenderRecoveryError) + fn sealed_block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult> { + self.block_with_senders( + id, + transaction_kind, + |block_number| self.sealed_header(block_number), + |header, body, senders, ommers, withdrawals, requests| { + SealedBlock { header, body, ommers, withdrawals, requests } + // Note: we're using unchecked here because we know the block contains valid txs + // wrt to its height and can ignore the s value check so pre + // EIP-2 txs are allowed + .try_with_senders_unchecked(senders) + .map(Some) + .map_err(|_| ProviderError::SenderRecoveryError) + }, + ) } fn block_range(&self, range: RangeInclusive) -> ProviderResult> { diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 69116517f..4788db247 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -328,6 +328,14 @@ where self.database.block_with_senders(id, transaction_kind) } + fn sealed_block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult> { + self.database.sealed_block_with_senders(id, transaction_kind) + } + fn block_range(&self, range: RangeInclusive) -> ProviderResult> { self.database.block_range(range) } diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 39e588c7f..19d6b0684 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1457,6 +1457,15 @@ impl BlockReader for StaticFileProvider { Err(ProviderError::UnsupportedProvider) } + fn sealed_block_with_senders( + &self, + _id: BlockHashOrNumber, + _transaction_kind: TransactionVariant, + ) -> ProviderResult> { + // Required data not present in static_files + Err(ProviderError::UnsupportedProvider) + } + fn block_range(&self, _range: RangeInclusive) -> ProviderResult> { // Required data not present in static_files Err(ProviderError::UnsupportedProvider) diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 73ee53eec..0184f5755 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -474,6 +474,14 @@ impl BlockReader for MockEthProvider { Ok(None) } + fn sealed_block_with_senders( + &self, + _id: BlockHashOrNumber, + _transaction_kind: TransactionVariant, + ) -> ProviderResult> { + Ok(None) + } + fn block_range(&self, range: RangeInclusive) -> ProviderResult> { let lock = self.blocks.lock(); diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 4f7bdabce..d52da187f 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -113,6 +113,14 @@ impl BlockReader for NoopProvider { Ok(None) } + fn sealed_block_with_senders( + &self, + _id: BlockHashOrNumber, + _transaction_kind: TransactionVariant, + ) -> ProviderResult> { + Ok(None) + } + fn block_range(&self, _range: RangeInclusive) -> ProviderResult> { Ok(vec![]) } diff --git a/crates/storage/storage-api/src/block.rs b/crates/storage/storage-api/src/block.rs index 42ab05f22..3dc22de8a 100644 --- a/crates/storage/storage-api/src/block.rs +++ b/crates/storage/storage-api/src/block.rs @@ -118,6 +118,17 @@ pub trait BlockReader: transaction_kind: TransactionVariant, ) -> ProviderResult>; + /// Returns the sealed block with senders with matching number or hash from database. + /// + /// Returns the block's transactions in the requested variant. + /// + /// Returns `None` if block is not found. + fn sealed_block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult>; + /// Returns all blocks in the given inclusive range. /// /// Note: returns only available blocks