refactor(evm): set prune modes optionally for the batch executor (#9176)

This commit is contained in:
Alexey Shekhirin
2024-07-03 12:05:02 +01:00
committed by GitHub
parent 71041b06a8
commit 8e5204c119
9 changed files with 83 additions and 69 deletions

View File

@ -22,7 +22,6 @@ use reth_provider::{
BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider, LatestStateProviderRef, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter, OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
}; };
use reth_prune::PruneModes;
use reth_revm::database::StateProviderDatabase; use reth_revm::database::StateProviderDatabase;
use reth_stages::{ use reth_stages::{
stages::{AccountHashingStage, MerkleStage, StorageHashingStage}, stages::{AccountHashingStage, MerkleStage, StorageHashingStage},
@ -147,13 +146,12 @@ impl Command {
provider_rw.insert_block(sealed_block.clone())?; provider_rw.insert_block(sealed_block.clone())?;
td += sealed_block.difficulty; td += sealed_block.difficulty;
let mut executor = executor_provider.batch_executor( let mut executor = executor_provider.batch_executor(StateProviderDatabase::new(
StateProviderDatabase::new(LatestStateProviderRef::new( LatestStateProviderRef::new(
provider_rw.tx_ref(), provider_rw.tx_ref(),
provider_rw.static_file_provider().clone(), provider_rw.static_file_provider().clone(),
)), ),
PruneModes::none(), ));
);
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?; executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
executor.finalize().write_to_storage( executor.finalize().write_to_storage(
provider_rw.tx_ref(), provider_rw.tx_ref(),

View File

@ -93,14 +93,14 @@ where
self.eth_executor(db) self.eth_executor(db)
} }
fn batch_executor<DB>(&self, db: DB, prune_modes: PruneModes) -> Self::BatchExecutor<DB> fn batch_executor<DB>(&self, db: DB) -> Self::BatchExecutor<DB>
where where
DB: Database<Error: Into<ProviderError> + Display>, DB: Database<Error: Into<ProviderError> + Display>,
{ {
let executor = self.eth_executor(db); let executor = self.eth_executor(db);
EthBatchExecutor { EthBatchExecutor {
executor, executor,
batch_record: BlockBatchRecord::new(prune_modes), batch_record: BlockBatchRecord::default(),
stats: BlockExecutorStats::default(), stats: BlockExecutorStats::default(),
} }
} }
@ -451,6 +451,10 @@ where
self.batch_record.set_tip(tip); self.batch_record.set_tip(tip);
} }
fn set_prune_modes(&mut self, prune_modes: PruneModes) {
self.batch_record.set_prune_modes(prune_modes);
}
fn size_hint(&self) -> Option<usize> { fn size_hint(&self) -> Option<usize> {
Some(self.executor.state.bundle_state.size_hint()) Some(self.executor.state.bundle_state.size_hint())
} }
@ -634,7 +638,7 @@ mod tests {
// attempt to execute an empty block with parent beacon block root, this should not fail // attempt to execute an empty block with parent beacon block root, this should not fail
provider provider
.batch_executor(StateProviderDatabase::new(&db), PruneModes::none()) .batch_executor(StateProviderDatabase::new(&db))
.execute_and_verify_one( .execute_and_verify_one(
( (
&BlockWithSenders { &BlockWithSenders {
@ -684,8 +688,7 @@ mod tests {
..Header::default() ..Header::default()
}; };
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
// attempt to execute an empty block with parent beacon block root, this should not fail // attempt to execute an empty block with parent beacon block root, this should not fail
executor executor
@ -728,8 +731,7 @@ mod tests {
let mut header = chain_spec.genesis_header(); let mut header = chain_spec.genesis_header();
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
// attempt to execute the genesis block with non-zero parent beacon block root, expect err // attempt to execute the genesis block with non-zero parent beacon block root, expect err
header.parent_beacon_block_root = Some(B256::with_last_byte(0x69)); header.parent_beacon_block_root = Some(B256::with_last_byte(0x69));
@ -816,8 +818,7 @@ mod tests {
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
// execute header // execute header
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
// Now execute a block with the fixed header, ensure that it does not fail // Now execute a block with the fixed header, ensure that it does not fail
executor executor
@ -884,8 +885,7 @@ mod tests {
); );
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
// construct the header for block one // construct the header for block one
let header = Header { timestamp: 1, number: 1, ..Header::default() }; let header = Header { timestamp: 1, number: 1, ..Header::default() };
@ -938,8 +938,7 @@ mod tests {
let header = chain_spec.genesis_header(); let header = chain_spec.genesis_header();
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
// attempt to execute genesis block, this should not fail // attempt to execute genesis block, this should not fail
executor executor
@ -996,8 +995,7 @@ mod tests {
..Header::default() ..Header::default()
}; };
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
// attempt to execute the fork activation block, this should not fail // attempt to execute the fork activation block, this should not fail
executor executor
@ -1052,8 +1050,7 @@ mod tests {
); );
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
let header = Header { let header = Header {
parent_hash: B256::random(), parent_hash: B256::random(),
@ -1115,8 +1112,7 @@ mod tests {
let header_hash = header.hash_slow(); let header_hash = header.hash_slow();
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
// attempt to execute the genesis block, this should not fail // attempt to execute the genesis block, this should not fail
executor executor

View File

@ -36,13 +36,13 @@ where
} }
} }
fn batch_executor<DB>(&self, db: DB, prune_modes: PruneModes) -> Self::BatchExecutor<DB> fn batch_executor<DB>(&self, db: DB) -> Self::BatchExecutor<DB>
where where
DB: Database<Error: Into<ProviderError> + Display>, DB: Database<Error: Into<ProviderError> + Display>,
{ {
match self { match self {
Self::Left(a) => Either::Left(a.batch_executor(db, prune_modes)), Self::Left(a) => Either::Left(a.batch_executor(db)),
Self::Right(b) => Either::Right(b.batch_executor(db, prune_modes)), Self::Right(b) => Either::Right(b.batch_executor(db)),
} }
} }
} }
@ -116,6 +116,13 @@ where
} }
} }
fn set_prune_modes(&mut self, prune_modes: PruneModes) {
match self {
Self::Left(a) => a.set_prune_modes(prune_modes),
Self::Right(b) => b.set_prune_modes(prune_modes),
}
}
fn size_hint(&self) -> Option<usize> { fn size_hint(&self) -> Option<usize> {
match self { match self {
Self::Left(a) => a.size_hint(), Self::Left(a) => a.size_hint(),

View File

@ -85,6 +85,11 @@ pub trait BatchExecutor<DB> {
/// This can be used to optimize state pruning during execution. /// This can be used to optimize state pruning during execution.
fn set_tip(&mut self, tip: BlockNumber); fn set_tip(&mut self, tip: BlockNumber);
/// Set the prune modes.
///
/// They are used to determine which parts of the state should be kept during execution.
fn set_prune_modes(&mut self, prune_modes: PruneModes);
/// The size hint of the batch's tracked state size. /// The size hint of the batch's tracked state size.
/// ///
/// This is used to optimize DB commits depending on the size of the state. /// This is used to optimize DB commits depending on the size of the state.
@ -169,10 +174,7 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static {
/// ///
/// Batch executor is used to execute multiple blocks in sequence and keep track of the state /// Batch executor is used to execute multiple blocks in sequence and keep track of the state
/// during historical sync which involves executing multiple blocks in sequence. /// during historical sync which involves executing multiple blocks in sequence.
/// fn batch_executor<DB>(&self, db: DB) -> Self::BatchExecutor<DB>
/// The pruning modes are used to determine which parts of the state should be kept during
/// execution.
fn batch_executor<DB>(&self, db: DB, prune_modes: PruneModes) -> Self::BatchExecutor<DB>
where where
DB: Database<Error: Into<ProviderError> + Display>; DB: Database<Error: Into<ProviderError> + Display>;
} }
@ -198,7 +200,7 @@ mod tests {
TestExecutor(PhantomData) TestExecutor(PhantomData)
} }
fn batch_executor<DB>(&self, _db: DB, _prune_modes: PruneModes) -> Self::BatchExecutor<DB> fn batch_executor<DB>(&self, _db: DB) -> Self::BatchExecutor<DB>
where where
DB: Database<Error: Into<ProviderError> + Display>, DB: Database<Error: Into<ProviderError> + Display>,
{ {
@ -235,6 +237,10 @@ mod tests {
todo!() todo!()
} }
fn set_prune_modes(&mut self, _prune_modes: PruneModes) {
todo!()
}
fn size_hint(&self) -> Option<usize> { fn size_hint(&self) -> Option<usize> {
None None
} }

View File

@ -32,7 +32,7 @@ impl BlockExecutorProvider for NoopBlockExecutorProvider {
Self Self
} }
fn batch_executor<DB>(&self, _: DB, _: PruneModes) -> Self::BatchExecutor<DB> fn batch_executor<DB>(&self, _: DB) -> Self::BatchExecutor<DB>
where where
DB: Database<Error: Into<ProviderError> + Display>, DB: Database<Error: Into<ProviderError> + Display>,
{ {
@ -65,6 +65,8 @@ impl<DB> BatchExecutor<DB> for NoopBlockExecutorProvider {
fn set_tip(&mut self, _: BlockNumber) {} fn set_tip(&mut self, _: BlockNumber) {}
fn set_prune_modes(&mut self, _: PruneModes) {}
fn size_hint(&self) -> Option<usize> { fn size_hint(&self) -> Option<usize> {
None None
} }

View File

@ -37,7 +37,7 @@ impl BlockExecutorProvider for MockExecutorProvider {
self.clone() self.clone()
} }
fn batch_executor<DB>(&self, _: DB, _: PruneModes) -> Self::BatchExecutor<DB> fn batch_executor<DB>(&self, _: DB) -> Self::BatchExecutor<DB>
where where
DB: Database<Error: Into<ProviderError> + Display>, DB: Database<Error: Into<ProviderError> + Display>,
{ {
@ -77,6 +77,8 @@ impl<DB> BatchExecutor<DB> for MockExecutorProvider {
fn set_tip(&mut self, _: BlockNumber) {} fn set_tip(&mut self, _: BlockNumber) {}
fn set_prune_modes(&mut self, _: PruneModes) {}
fn size_hint(&self) -> Option<usize> { fn size_hint(&self) -> Option<usize> {
None None
} }

View File

@ -25,8 +25,19 @@ pub struct BackfillJobFactory<E, P> {
impl<E, P> BackfillJobFactory<E, P> { impl<E, P> BackfillJobFactory<E, P> {
/// Creates a new [`BackfillJobFactory`]. /// Creates a new [`BackfillJobFactory`].
pub fn new(executor: E, provider: P, prune_modes: PruneModes) -> Self { pub fn new(executor: E, provider: P) -> Self {
Self { executor, provider, prune_modes, thresholds: ExecutionStageThresholds::default() } Self {
executor,
provider,
prune_modes: PruneModes::none(),
thresholds: ExecutionStageThresholds::default(),
}
}
/// Sets the prune modes
pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
self.prune_modes = prune_modes;
self
} }
/// Sets the thresholds /// Sets the thresholds
@ -54,12 +65,10 @@ impl BackfillJobFactory<(), ()> {
/// Creates a new [`BackfillJobFactory`] from [`FullNodeComponents`]. /// Creates a new [`BackfillJobFactory`] from [`FullNodeComponents`].
pub fn new_from_components<Node: FullNodeComponents>( pub fn new_from_components<Node: FullNodeComponents>(
components: Node, components: Node,
prune_modes: PruneModes,
) -> BackfillJobFactory<Node::Executor, Node::Provider> { ) -> BackfillJobFactory<Node::Executor, Node::Provider> {
BackfillJobFactory::<_, _>::new( BackfillJobFactory::<_, _>::new(
components.block_executor().clone(), components.block_executor().clone(),
components.provider().clone(), components.provider().clone(),
prune_modes,
) )
} }
} }
@ -73,8 +82,8 @@ pub struct BackfillJob<E, DB, P> {
executor: E, executor: E,
provider: P, provider: P,
prune_modes: PruneModes, prune_modes: PruneModes,
range: RangeInclusive<BlockNumber>,
thresholds: ExecutionStageThresholds, thresholds: ExecutionStageThresholds,
range: RangeInclusive<BlockNumber>,
_db: PhantomData<DB>, _db: PhantomData<DB>,
} }
@ -102,12 +111,10 @@ where
P: FullProvider<DB>, P: FullProvider<DB>,
{ {
fn execute_range(&mut self) -> Result<Chain, BlockExecutionError> { fn execute_range(&mut self) -> Result<Chain, BlockExecutionError> {
let mut executor = self.executor.batch_executor( let mut executor = self.executor.batch_executor(StateProviderDatabase::new(
StateProviderDatabase::new( self.provider.history_by_block_number(self.range.start().saturating_sub(1))?,
self.provider.history_by_block_number(self.range.start().saturating_sub(1))?, ));
), executor.set_prune_modes(self.prune_modes.clone());
self.prune_modes.clone(),
);
let mut fetch_block_duration = Duration::default(); let mut fetch_block_duration = Duration::default();
let mut execution_duration = Duration::default(); let mut execution_duration = Duration::default();
@ -205,7 +212,6 @@ mod tests {
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
BlockWriter, LatestStateProviderRef, BlockWriter, LatestStateProviderRef,
}; };
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase; use reth_revm::database::StateProviderDatabase;
use reth_testing_utils::generators::{self, sign_tx_with_key_pair}; use reth_testing_utils::generators::{self, sign_tx_with_key_pair};
use secp256k1::Keypair; use secp256k1::Keypair;
@ -289,24 +295,18 @@ mod tests {
let provider = provider_factory.provider()?; let provider = provider_factory.provider()?;
// Execute only the first block on top of genesis state // Execute only the first block on top of genesis state
let mut outcome_single = EthExecutorProvider::ethereum(chain_spec.clone()) let mut outcome_single = EthExecutorProvider::ethereum(chain_spec.clone())
.batch_executor( .batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(
StateProviderDatabase::new(LatestStateProviderRef::new( provider.tx_ref(),
provider.tx_ref(), provider.static_file_provider().clone(),
provider.static_file_provider().clone(), )))
)),
PruneModes::none(),
)
.execute_and_verify_batch([(&block1, U256::ZERO).into()])?; .execute_and_verify_batch([(&block1, U256::ZERO).into()])?;
outcome_single.bundle.reverts.sort(); outcome_single.bundle.reverts.sort();
// Execute both blocks on top of the genesis state // Execute both blocks on top of the genesis state
let outcome_batch = EthExecutorProvider::ethereum(chain_spec) let outcome_batch = EthExecutorProvider::ethereum(chain_spec)
.batch_executor( .batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(
StateProviderDatabase::new(LatestStateProviderRef::new( provider.tx_ref(),
provider.tx_ref(), provider.static_file_provider().clone(),
provider.static_file_provider().clone(), )))
)),
PruneModes::none(),
)
.execute_and_verify_batch([ .execute_and_verify_batch([
(&block1, U256::ZERO).into(), (&block1, U256::ZERO).into(),
(&block2, U256::ZERO).into(), (&block2, U256::ZERO).into(),
@ -327,7 +327,7 @@ mod tests {
provider_rw.commit()?; provider_rw.commit()?;
// Backfill the first block // Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db, PruneModes::none()); let factory = BackfillJobFactory::new(executor, blockchain_db);
let job = factory.backfill(1..=1); let job = factory.backfill(1..=1);
let chains = job.collect::<Result<Vec<_>, _>>()?; let chains = job.collect::<Result<Vec<_>, _>>()?;

View File

@ -79,14 +79,14 @@ where
self.op_executor(db) self.op_executor(db)
} }
fn batch_executor<DB>(&self, db: DB, prune_modes: PruneModes) -> Self::BatchExecutor<DB> fn batch_executor<DB>(&self, db: DB) -> Self::BatchExecutor<DB>
where where
DB: Database<Error: Into<ProviderError> + std::fmt::Display>, DB: Database<Error: Into<ProviderError> + std::fmt::Display>,
{ {
let executor = self.op_executor(db); let executor = self.op_executor(db);
OpBatchExecutor { OpBatchExecutor {
executor, executor,
batch_record: BlockBatchRecord::new(prune_modes), batch_record: BlockBatchRecord::default(),
stats: BlockExecutorStats::default(), stats: BlockExecutorStats::default(),
} }
} }
@ -435,6 +435,10 @@ where
self.batch_record.set_tip(tip); self.batch_record.set_tip(tip);
} }
fn set_prune_modes(&mut self, prune_modes: PruneModes) {
self.batch_record.set_prune_modes(prune_modes);
}
fn size_hint(&self) -> Option<usize> { fn size_hint(&self) -> Option<usize> {
Some(self.executor.state.bundle_state.size_hint()) Some(self.executor.state.bundle_state.size_hint())
} }
@ -528,8 +532,7 @@ mod tests {
); );
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
executor.state_mut().load_cache_account(L1_BLOCK_CONTRACT).unwrap(); executor.state_mut().load_cache_account(L1_BLOCK_CONTRACT).unwrap();
@ -610,8 +613,7 @@ mod tests {
); );
let provider = executor_provider(chain_spec); let provider = executor_provider(chain_spec);
let mut executor = let mut executor = provider.batch_executor(StateProviderDatabase::new(&db));
provider.batch_executor(StateProviderDatabase::new(&db), PruneModes::none());
executor.state_mut().load_cache_account(L1_BLOCK_CONTRACT).unwrap(); executor.state_mut().load_cache_account(L1_BLOCK_CONTRACT).unwrap();

View File

@ -222,8 +222,9 @@ where
provider.tx_ref(), provider.tx_ref(),
provider.static_file_provider().clone(), provider.static_file_provider().clone(),
)); ));
let mut executor = self.executor_provider.batch_executor(db, prune_modes); let mut executor = self.executor_provider.batch_executor(db);
executor.set_tip(max_block); executor.set_tip(max_block);
executor.set_prune_modes(prune_modes);
// Progress tracking // Progress tracking
let mut stage_progress = start_block; let mut stage_progress = start_block;