feat(exex): backfill executor (#9123)

This commit is contained in:
Alexey Shekhirin
2024-06-28 16:22:42 +01:00
committed by GitHub
parent 9a2cfe5a5c
commit 9129b97c5b
25 changed files with 633 additions and 140 deletions

12
Cargo.lock generated
View File

@ -6636,6 +6636,7 @@ dependencies = [
"humantime-serde",
"reth-network-types",
"reth-prune-types",
"reth-stages-types",
"serde",
"tempfile",
"toml",
@ -7216,7 +7217,13 @@ version = "1.0.0"
dependencies = [
"eyre",
"metrics",
"reth-blockchain-tree",
"reth-chainspec",
"reth-config",
"reth-db-api",
"reth-db-common",
"reth-evm",
"reth-evm-ethereum",
"reth-exex-types",
"reth-metrics",
"reth-network",
@ -7225,8 +7232,13 @@ dependencies = [
"reth-payload-builder",
"reth-primitives",
"reth-provider",
"reth-prune-types",
"reth-revm",
"reth-stages-api",
"reth-tasks",
"reth-testing-utils",
"reth-tracing",
"secp256k1",
"serde",
"tokio",
"tokio-util",

View File

@ -28,9 +28,8 @@ use reth_provider::{
};
use reth_prune::PruneModes;
use reth_stages::{
sets::DefaultStages,
stages::{ExecutionStage, ExecutionStageThresholds},
Pipeline, StageId, StageSet,
sets::DefaultStages, stages::ExecutionStage, ExecutionStageThresholds, Pipeline, StageId,
StageSet,
};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;

View File

@ -12,10 +12,10 @@ use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_prune::PruneModes;
use reth_stages::{
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
StorageHashingStage, MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage,
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
},
Stage, StageCheckpoint, UnwindInput,
ExecutionStageThresholds, Stage, StageCheckpoint, UnwindInput,
};
use tracing::info;

View File

@ -20,11 +20,11 @@ use reth_provider::{
};
use reth_stages::{
stages::{
AccountHashingStage, BodyStage, ExecutionStage, ExecutionStageThresholds,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
AccountHashingStage, BodyStage, ExecutionStage, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TransactionLookupStage,
},
ExecInput, ExecOutput, Stage, StageExt, UnwindInput, UnwindOutput,
ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
};
use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
use tracing::*;

View File

@ -16,8 +16,8 @@ use reth_provider::{
use reth_prune::PruneModes;
use reth_stages::{
sets::{DefaultStages, OfflineStages},
stages::{ExecutionStage, ExecutionStageThresholds},
Pipeline, StageSet,
stages::ExecutionStage,
ExecutionStageThresholds, Pipeline, StageSet,
};
use reth_static_file::StaticFileProducer;
use std::{ops::RangeInclusive, sync::Arc};

View File

@ -14,6 +14,7 @@ workspace = true
# reth
reth-network-types = { workspace = true, features = ["serde"] }
reth-prune-types.workspace = true
reth-stages-types.workspace = true
# serde
serde.workspace = true

View File

@ -2,6 +2,7 @@
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_prune_types::PruneModes;
use reth_stages_types::ExecutionStageThresholds;
use serde::{Deserialize, Deserializer, Serialize};
use std::{
ffi::OsStr,
@ -216,6 +217,17 @@ impl Default for ExecutionConfig {
}
}
impl From<ExecutionConfig> for ExecutionStageThresholds {
fn from(config: ExecutionConfig) -> Self {
Self {
max_blocks: config.max_blocks,
max_changes: config.max_changes,
max_cumulative_gas: config.max_cumulative_gas,
max_duration: config.max_duration,
}
}
}
/// Hashing stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]

View File

@ -94,6 +94,11 @@ impl Chain {
&self.execution_outcome
}
/// Get mutable execution outcome of this chain
pub fn execution_outcome_mut(&mut self) -> &mut ExecutionOutcome {
&mut self.execution_outcome
}
/// Prepends the given state to the current state.
pub fn prepend_state(&mut self, state: BundleState) {
self.execution_outcome.prepend_state(state);

View File

@ -24,6 +24,11 @@ reth-tasks.workspace = true
reth-tracing.workspace = true
reth-network.workspace = true
reth-payload-builder.workspace = true
reth-evm.workspace = true
reth-prune-types.workspace = true
reth-revm.workspace = true
reth-stages-api.workspace = true
reth-db-api.workspace = true
## async
tokio.workspace = true
@ -34,6 +39,17 @@ eyre.workspace = true
metrics.workspace = true
serde = { workspace = true, optional = true }
[dev-dependencies]
reth-chainspec.workspace = true
reth-evm-ethereum.workspace = true
reth-testing-utils.workspace = true
reth-blockchain-tree.workspace = true
reth-db-common.workspace = true
reth-node-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
secp256k1.workspace = true
[features]
default = []
serde = ["dep:serde", "reth-provider/serde"]

View File

@ -0,0 +1,344 @@
use reth_db_api::database::Database;
use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider};
use reth_node_api::FullNodeComponents;
use reth_primitives::{Block, BlockNumber};
use reth_provider::{Chain, FullProvider, ProviderError, TransactionVariant};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::{format_gas_throughput, ExecutionStageThresholds};
use reth_tracing::tracing::{debug, trace};
use std::{
marker::PhantomData,
ops::RangeInclusive,
time::{Duration, Instant},
};
/// Factory for creating new backfill jobs.
#[derive(Debug, Clone)]
pub struct BackfillJobFactory<E, P> {
executor: E,
provider: P,
prune_modes: PruneModes,
thresholds: ExecutionStageThresholds,
}
impl<E, P> BackfillJobFactory<E, P> {
/// Creates a new [`BackfillJobFactory`].
pub fn new(executor: E, provider: P, prune_modes: PruneModes) -> Self {
Self { executor, provider, prune_modes, thresholds: ExecutionStageThresholds::default() }
}
/// Sets the thresholds
pub const fn with_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
self.thresholds = thresholds;
self
}
}
impl<E: Clone, P: Clone> BackfillJobFactory<E, P> {
/// Creates a new backfill job for the given range.
pub fn backfill<DB>(&self, range: RangeInclusive<BlockNumber>) -> BackfillJob<E, DB, P> {
BackfillJob {
executor: self.executor.clone(),
provider: self.provider.clone(),
prune_modes: self.prune_modes.clone(),
range,
thresholds: self.thresholds.clone(),
_db: PhantomData,
}
}
}
impl BackfillJobFactory<(), ()> {
/// Creates a new [`BackfillJobFactory`] from [`FullNodeComponents`].
pub fn new_from_components<Node: FullNodeComponents>(
components: Node,
prune_modes: PruneModes,
) -> BackfillJobFactory<Node::Executor, Node::Provider> {
BackfillJobFactory::<_, _>::new(
components.block_executor().clone(),
components.provider().clone(),
prune_modes,
)
}
}
/// Backfill job started for a specific range.
///
/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds
/// and yields [`Chain`]
#[derive(Debug)]
pub struct BackfillJob<E, DB, P> {
executor: E,
provider: P,
prune_modes: PruneModes,
range: RangeInclusive<BlockNumber>,
thresholds: ExecutionStageThresholds,
_db: PhantomData<DB>,
}
impl<E, DB, P> Iterator for BackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
{
type Item = Result<Chain, BlockExecutionError>;
fn next(&mut self) -> Option<Self::Item> {
if self.range.is_empty() {
return None
}
Some(self.execute_range())
}
}
impl<E, DB, P> BackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
{
fn execute_range(&mut self) -> Result<Chain, BlockExecutionError> {
let mut executor = self.executor.batch_executor(
StateProviderDatabase::new(
self.provider.history_by_block_number(self.range.start().saturating_sub(1))?,
),
self.prune_modes.clone(),
);
let mut fetch_block_duration = Duration::default();
let mut execution_duration = Duration::default();
let mut cumulative_gas = 0;
let batch_start = Instant::now();
let mut blocks = Vec::new();
for block_number in self.range.clone() {
// Fetch the block
let fetch_block_start = Instant::now();
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
// we need the block's transactions along with their hashes
let block = self
.provider
.sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
fetch_block_duration += fetch_block_start.elapsed();
cumulative_gas += block.gas_used;
// Configure the executor to use the current state.
trace!(target: "exex::backfill", number = block_number, txs = block.body.len(), "Executing block");
// Execute the block
let execute_start = Instant::now();
// Unseal the block for execution
let (block, senders) = block.into_components();
let (unsealed_header, hash) = block.header.split();
let block = Block {
header: unsealed_header,
body: block.body,
ommers: block.ommers,
withdrawals: block.withdrawals,
requests: block.requests,
}
.with_senders_unchecked(senders);
executor.execute_and_verify_one((&block, td).into())?;
execution_duration += execute_start.elapsed();
// TODO(alexey): report gas metrics using `block.header.gas_used`
// Seal the block back and save it
blocks.push(block.seal(hash));
// Check if we should commit now
let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64;
if self.thresholds.is_end_of_batch(
block_number - *self.range.start(),
bundle_size_hint,
cumulative_gas,
batch_start.elapsed(),
) {
break
}
}
let last_block_number = blocks.last().expect("blocks should not be empty").number;
debug!(
target: "exex::backfill",
range = ?*self.range.start()..=last_block_number,
block_fetch = ?fetch_block_duration,
execution = ?execution_duration,
throughput = format_gas_throughput(cumulative_gas, execution_duration),
"Finished executing block range"
);
self.range = last_block_number + 1..=*self.range.end();
let chain = Chain::new(blocks, executor.finalize(), None);
Ok(chain)
}
}
#[cfg(test)]
mod tests {
use crate::BackfillJobFactory;
use eyre::OptionExt;
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_chainspec::{ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_db_common::init::init_genesis;
use reth_evm::execute::{BatchExecutor, BlockExecutorProvider};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::{
b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, Genesis,
GenesisAccount, Header, Transaction, TxEip2930, TxKind, U256,
};
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
BlockWriter, LatestStateProviderRef,
};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_testing_utils::generators::{self, sign_tx_with_key_pair};
use secp256k1::Keypair;
use std::sync::Arc;
#[tokio::test]
async fn test_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());
// Create a chain spec with a genesis state that contains the sender
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(Genesis {
alloc: [(
address,
GenesisAccount { balance: U256::from(ETH_TO_WEI), ..Default::default() },
)]
.into(),
..MAINNET.genesis.clone()
})
.paris_activated()
.build(),
);
let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;
// First block has a transaction that transfers some ETH to zero address
let block1 = Block {
header: Header {
parent_hash: chain_spec.genesis_hash(),
receipts_root: b256!(
"d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e"
),
difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"),
number: 1,
gas_limit: 21000,
gas_used: 21000,
..Default::default()
},
body: vec![sign_tx_with_key_pair(
key_pair,
Transaction::Eip2930(TxEip2930 {
chain_id: chain_spec.chain.id(),
nonce: 0,
gas_limit: 21000,
gas_price: 1_500_000_000,
to: TxKind::Call(Address::ZERO),
value: U256::from(0.1 * ETH_TO_WEI as f64),
..Default::default()
}),
)],
..Default::default()
}
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;
// Second block has no state changes
let block2 = Block {
header: Header {
parent_hash: block1.hash_slow(),
difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"),
number: 2,
..Default::default()
},
..Default::default()
}
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;
let provider = provider_factory.provider()?;
// Execute only the first block on top of genesis state
let mut outcome_single = EthExecutorProvider::ethereum(chain_spec.clone())
.batch_executor(
StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)),
PruneModes::none(),
)
.execute_and_verify_batch([(&block1, U256::ZERO).into()])?;
outcome_single.bundle.reverts.sort();
// Execute both blocks on top of the genesis state
let outcome_batch = EthExecutorProvider::ethereum(chain_spec)
.batch_executor(
StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)),
PruneModes::none(),
)
.execute_and_verify_batch([
(&block1, U256::ZERO).into(),
(&block2, U256::ZERO).into(),
])?;
drop(provider);
let block1 = block1.seal_slow();
let block2 = block2.seal_slow();
// Update the state with the execution results of both blocks
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_state(
vec![block1.clone(), block2],
outcome_batch,
Default::default(),
Default::default(),
None,
)?;
provider_rw.commit()?;
// Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db, PruneModes::none());
let job = factory.backfill(1..=1);
let chains = job.collect::<Result<Vec<_>, _>>()?;
// Assert that the backfill job produced the same chain as we got before when we were
// executing only the first block
assert_eq!(chains.len(), 1);
let mut chain = chains.into_iter().next().unwrap();
chain.execution_outcome_mut().bundle.reverts.sort();
assert_eq!(chain.blocks(), &[(1, block1)].into());
assert_eq!(chain.execution_outcome(), &outcome_single);
Ok(())
}
}

View File

@ -34,6 +34,9 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
mod backfill;
pub use backfill::*;
mod context;
pub use context::*;

View File

@ -0,0 +1,20 @@
use std::time::Duration;
use reth_primitives_traits::constants::gas_units::{GIGAGAS, KILOGAS, MEGAGAS};
/// Returns a formatted gas throughput log, showing either:
/// * "Kgas/s", or 1,000 gas per second
/// * "Mgas/s", or 1,000,000 gas per second
/// * "Ggas/s", or 1,000,000,000 gas per second
///
/// Depending on the magnitude of the gas throughput.
pub fn format_gas_throughput(gas: u64, execution_duration: Duration) -> String {
let gas_per_second = gas as f64 / execution_duration.as_secs_f64();
if gas_per_second < MEGAGAS as f64 {
format!("{:.} Kgas/second", gas_per_second / KILOGAS as f64)
} else if gas_per_second < GIGAGAS as f64 {
format!("{:.} Mgas/second", gas_per_second / MEGAGAS as f64)
} else {
format!("{:.} Ggas/second", gas_per_second / GIGAGAS as f64)
}
}

View File

@ -1,5 +1,7 @@
mod execution;
mod listener;
mod sync_metrics;
pub use execution::format_gas_throughput;
pub use listener::{MetricEvent, MetricEventsSender, MetricsListener};
use sync_metrics::*;

View File

@ -6,10 +6,7 @@ use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_evm::execute::{BatchExecutor, BlockExecutorProvider};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex::{ExExManagerHandle, ExExNotification};
use reth_primitives::{
constants::gas_units::{GIGAGAS, KILOGAS, MEGAGAS},
BlockNumber, Header, StaticFileSegment,
};
use reth_primitives::{BlockNumber, Header, StaticFileSegment};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
@ -18,9 +15,9 @@ use reth_provider::{
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput,
ExecutionCheckpoint, MetricEvent, MetricEventsSender, Stage, StageCheckpoint, StageError,
StageId, UnwindInput, UnwindOutput,
format_gas_throughput, BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput,
ExecOutput, ExecutionCheckpoint, ExecutionStageThresholds, MetricEvent, MetricEventsSender,
Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use std::{
cmp::Ordering,
@ -546,83 +543,6 @@ fn calculate_gas_used_from_headers(
Ok(gas_total)
}
/// The thresholds at which the execution stage writes state changes to the database.
///
/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage
/// commits all pending changes to the database.
///
/// A third threshold, `max_changesets`, can be set to periodically write changesets to the
/// current database transaction, which frees up memory.
#[derive(Debug, Clone)]
pub struct ExecutionStageThresholds {
/// The maximum number of blocks to execute before the execution stage commits.
pub max_blocks: Option<u64>,
/// The maximum number of state changes to keep in memory before the execution stage commits.
pub max_changes: Option<u64>,
/// The maximum cumulative amount of gas to process before the execution stage commits.
pub max_cumulative_gas: Option<u64>,
/// The maximum spent on blocks processing before the execution stage commits.
pub max_duration: Option<Duration>,
}
impl Default for ExecutionStageThresholds {
fn default() -> Self {
Self {
max_blocks: Some(500_000),
max_changes: Some(5_000_000),
// 50k full blocks of 30M gas
max_cumulative_gas: Some(30_000_000 * 50_000),
// 10 minutes
max_duration: Some(Duration::from_secs(10 * 60)),
}
}
}
impl ExecutionStageThresholds {
/// Check if the batch thresholds have been hit.
#[inline]
pub fn is_end_of_batch(
&self,
blocks_processed: u64,
changes_processed: u64,
cumulative_gas_used: u64,
elapsed: Duration,
) -> bool {
blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) ||
changes_processed >= self.max_changes.unwrap_or(u64::MAX) ||
cumulative_gas_used >= self.max_cumulative_gas.unwrap_or(u64::MAX) ||
elapsed >= self.max_duration.unwrap_or(Duration::MAX)
}
}
impl From<ExecutionConfig> for ExecutionStageThresholds {
fn from(config: ExecutionConfig) -> Self {
Self {
max_blocks: config.max_blocks,
max_changes: config.max_changes,
max_cumulative_gas: config.max_cumulative_gas,
max_duration: config.max_duration,
}
}
}
/// Returns a formatted gas throughput log, showing either:
/// * "Kgas/s", or 1,000 gas per second
/// * "Mgas/s", or 1,000,000 gas per second
/// * "Ggas/s", or 1,000,000,000 gas per second
///
/// Depending on the magnitude of the gas throughput.
pub fn format_gas_throughput(gas: u64, execution_duration: Duration) -> String {
let gas_per_second = gas as f64 / execution_duration.as_secs_f64();
if gas_per_second < MEGAGAS as f64 {
format!("{:.} Kgas/second", gas_per_second / KILOGAS as f64)
} else if gas_per_second < GIGAGAS as f64 {
format!("{:.} Mgas/second", gas_per_second / MEGAGAS as f64)
} else {
format!("{:.} Ggas/second", gas_per_second / GIGAGAS as f64)
}
}
/// Returns a `StaticFileProviderRWRefMut` static file producer after performing a consistency
/// check.
///
@ -720,7 +640,7 @@ mod tests {
StaticFileProviderFactory,
};
use reth_prune_types::{PruneMode, ReceiptsLogPruneConfig};
use reth_stages_api::StageUnitCheckpoint;
use reth_stages_api::{format_gas_throughput, StageUnitCheckpoint};
use std::collections::BTreeMap;
fn stage() -> ExecutionStage<EthExecutorProvider> {

View File

@ -66,7 +66,9 @@ mod tests {
StaticFileProviderFactory, StorageReader,
};
use reth_prune_types::{PruneMode, PruneModes};
use reth_stages_api::{ExecInput, PipelineTarget, Stage, StageCheckpoint, StageId};
use reth_stages_api::{
ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
};
use reth_testing_utils::generators::{self, random_block, random_block_range, random_receipt};
use std::{io::Write, sync::Arc};

View File

@ -0,0 +1,50 @@
use std::time::Duration;
/// The thresholds at which the execution stage writes state changes to the database.
///
/// If either of the thresholds (`max_blocks` and `max_changes`) are hit, then the execution stage
/// commits all pending changes to the database.
///
/// A third threshold, `max_changesets`, can be set to periodically write changesets to the
/// current database transaction, which frees up memory.
#[derive(Debug, Clone)]
pub struct ExecutionStageThresholds {
/// The maximum number of blocks to execute before the execution stage commits.
pub max_blocks: Option<u64>,
/// The maximum number of state changes to keep in memory before the execution stage commits.
pub max_changes: Option<u64>,
/// The maximum cumulative amount of gas to process before the execution stage commits.
pub max_cumulative_gas: Option<u64>,
/// The maximum spent on blocks processing before the execution stage commits.
pub max_duration: Option<Duration>,
}
impl Default for ExecutionStageThresholds {
fn default() -> Self {
Self {
max_blocks: Some(500_000),
max_changes: Some(5_000_000),
// 50k full blocks of 30M gas
max_cumulative_gas: Some(30_000_000 * 50_000),
// 10 minutes
max_duration: Some(Duration::from_secs(10 * 60)),
}
}
}
impl ExecutionStageThresholds {
/// Check if the batch thresholds have been hit.
#[inline]
pub fn is_end_of_batch(
&self,
blocks_processed: u64,
changes_processed: u64,
cumulative_gas_used: u64,
elapsed: Duration,
) -> bool {
blocks_processed >= self.max_blocks.unwrap_or(u64::MAX) ||
changes_processed >= self.max_changes.unwrap_or(u64::MAX) ||
cumulative_gas_used >= self.max_cumulative_gas.unwrap_or(u64::MAX) ||
elapsed >= self.max_duration.unwrap_or(Duration::MAX)
}
}

View File

@ -19,6 +19,9 @@ pub use checkpoints::{
StageUnitCheckpoint, StorageHashingCheckpoint,
};
mod execution;
pub use execution::*;
/// Direction and target block for pipeline operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineTarget {

View File

@ -330,6 +330,14 @@ impl<DB: Database> BlockReader for ProviderFactory<DB> {
self.provider()?.block_with_senders(id, transaction_kind)
}
fn sealed_block_with_senders(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
self.provider()?.sealed_block_with_senders(id, transaction_kind)
}
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
self.provider()?.block_range(range)
}

View File

@ -357,6 +357,65 @@ impl<TX: DbTx> DatabaseProvider<TX> {
)
}
fn block_with_senders<H, HF, B, BF>(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
header_by_number: HF,
construct_block: BF,
) -> ProviderResult<Option<B>>
where
H: AsRef<Header>,
HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
BF: FnOnce(
H,
Vec<TransactionSigned>,
Vec<Address>,
Vec<Header>,
Option<Withdrawals>,
Option<Requests>,
) -> ProviderResult<Option<B>>,
{
let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
let Some(header) = header_by_number(block_number)? else { return Ok(None) };
let ommers = self.ommers(block_number.into())?.unwrap_or_default();
let withdrawals =
self.withdrawals_by_block(block_number.into(), header.as_ref().timestamp)?;
let requests = self.requests_by_block(block_number.into(), header.as_ref().timestamp)?;
// Get the block body
//
// If the body indices are not found, this means that the transactions either do not exist
// in the database yet, or they do exit but are not indexed. If they exist but are not
// indexed, we don't have enough information to return the block anyways, so we return
// `None`.
let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
let tx_range = body.tx_num_range();
let (transactions, senders) = if tx_range.is_empty() {
(vec![], vec![])
} else {
(self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
};
let body = transactions
.into_iter()
.map(|tx| match transaction_kind {
TransactionVariant::NoHash => TransactionSigned {
// Caller explicitly asked for no hash, so we don't calculate it
hash: B256::ZERO,
signature: tx.signature,
transaction: tx.transaction,
},
TransactionVariant::WithHash => tx.with_hash(),
})
.collect();
construct_block(header, body, senders, ommers, withdrawals, requests)
}
/// Returns a range of blocks from the database.
///
/// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
@ -1550,48 +1609,41 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<BlockWithSenders>> {
let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
let Some(header) = self.header_by_number(block_number)? else { return Ok(None) };
let ommers = self.ommers(block_number.into())?.unwrap_or_default();
let withdrawals = self.withdrawals_by_block(block_number.into(), header.timestamp)?;
let requests = self.requests_by_block(block_number.into(), header.timestamp)?;
// Get the block body
//
// If the body indices are not found, this means that the transactions either do not exist
// in the database yet, or they do exit but are not indexed. If they exist but are not
// indexed, we don't have enough information to return the block anyways, so we return
// `None`.
let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
let tx_range = body.tx_num_range();
let (transactions, senders) = if tx_range.is_empty() {
(vec![], vec![])
} else {
(self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
};
let body = transactions
.into_iter()
.map(|tx| match transaction_kind {
TransactionVariant::NoHash => TransactionSigned {
// Caller explicitly asked for no hash, so we don't calculate it
hash: B256::ZERO,
signature: tx.signature,
transaction: tx.transaction,
},
TransactionVariant::WithHash => tx.with_hash(),
})
.collect();
self.block_with_senders(
id,
transaction_kind,
|block_number| self.header_by_number(block_number),
|header, body, senders, ommers, withdrawals, requests| {
Block { header, body, ommers, withdrawals, requests }
// Note: we're using unchecked here because we know the block contains valid txs wrt to
// its height and can ignore the s value check so pre EIP-2 txs are allowed
// Note: we're using unchecked here because we know the block contains valid txs
// wrt to its height and can ignore the s value check so pre
// EIP-2 txs are allowed
.try_with_senders_unchecked(senders)
.map(Some)
.map_err(|_| ProviderError::SenderRecoveryError)
},
)
}
fn sealed_block_with_senders(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
self.block_with_senders(
id,
transaction_kind,
|block_number| self.sealed_header(block_number),
|header, body, senders, ommers, withdrawals, requests| {
SealedBlock { header, body, ommers, withdrawals, requests }
// Note: we're using unchecked here because we know the block contains valid txs
// wrt to its height and can ignore the s value check so pre
// EIP-2 txs are allowed
.try_with_senders_unchecked(senders)
.map(Some)
.map_err(|_| ProviderError::SenderRecoveryError)
},
)
}
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {

View File

@ -328,6 +328,14 @@ where
self.database.block_with_senders(id, transaction_kind)
}
fn sealed_block_with_senders(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
self.database.sealed_block_with_senders(id, transaction_kind)
}
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
self.database.block_range(range)
}

View File

@ -1457,6 +1457,15 @@ impl BlockReader for StaticFileProvider {
Err(ProviderError::UnsupportedProvider)
}
fn sealed_block_with_senders(
&self,
_id: BlockHashOrNumber,
_transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
// Required data not present in static_files
Err(ProviderError::UnsupportedProvider)
}
fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
// Required data not present in static_files
Err(ProviderError::UnsupportedProvider)

View File

@ -474,6 +474,14 @@ impl BlockReader for MockEthProvider {
Ok(None)
}
fn sealed_block_with_senders(
&self,
_id: BlockHashOrNumber,
_transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
Ok(None)
}
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
let lock = self.blocks.lock();

View File

@ -113,6 +113,14 @@ impl BlockReader for NoopProvider {
Ok(None)
}
fn sealed_block_with_senders(
&self,
_id: BlockHashOrNumber,
_transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
Ok(None)
}
fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
Ok(vec![])
}

View File

@ -118,6 +118,17 @@ pub trait BlockReader:
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<BlockWithSenders>>;
/// Returns the sealed block with senders with matching number or hash from database.
///
/// Returns the block's transactions in the requested variant.
///
/// Returns `None` if block is not found.
fn sealed_block_with_senders(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>>;
/// Returns all blocks in the given inclusive range.
///
/// Note: returns only available blocks