feat: implement batch executor (#11753)

This commit is contained in:
Federico Gimenez
2024-10-16 11:02:23 +02:00
committed by GitHub
parent a14a9fd8b0
commit 323d8edfb9
4 changed files with 184 additions and 42 deletions

2
Cargo.lock generated
View File

@ -7359,12 +7359,14 @@ dependencies = [
"metrics", "metrics",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"reth-chainspec", "reth-chainspec",
"reth-consensus",
"reth-execution-errors", "reth-execution-errors",
"reth-execution-types", "reth-execution-types",
"reth-metrics", "reth-metrics",
"reth-primitives", "reth-primitives",
"reth-primitives-traits", "reth-primitives-traits",
"reth-prune-types", "reth-prune-types",
"reth-revm",
"reth-storage-errors", "reth-storage-errors",
"revm", "revm",
"revm-primitives", "revm-primitives",

View File

@ -13,16 +13,18 @@ workspace = true
[dependencies] [dependencies]
# reth # reth
reth-chainspec.workspace = true reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-execution-errors.workspace = true reth-execution-errors.workspace = true
reth-execution-types.workspace = true
reth-metrics = { workspace = true, optional = true }
reth-primitives.workspace = true reth-primitives.workspace = true
reth-primitives-traits.workspace = true reth-primitives-traits.workspace = true
revm-primitives.workspace = true
reth-prune-types.workspace = true reth-prune-types.workspace = true
reth-metrics = { workspace = true, optional = true } reth-revm.workspace = true
reth-storage-errors.workspace = true reth-storage-errors.workspace = true
reth-execution-types.workspace = true
revm.workspace = true revm.workspace = true
revm-primitives.workspace = true
# alloy # alloy
alloy-primitives.workspace = true alloy-primitives.workspace = true

View File

@ -7,15 +7,17 @@ pub use reth_execution_errors::{
pub use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput, ExecutionOutcome}; pub use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput, ExecutionOutcome};
pub use reth_storage_errors::provider::ProviderError; pub use reth_storage_errors::provider::ProviderError;
use alloc::{boxed::Box, vec::Vec}; use crate::system_calls::OnStateHook;
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloy_primitives::BlockNumber; use alloy_primitives::BlockNumber;
use core::{fmt::Display, marker::PhantomData}; use core::{fmt::Display, marker::PhantomData};
use reth_chainspec::ChainSpec;
use reth_consensus::ConsensusError;
use reth_primitives::{BlockWithSenders, Receipt, Request}; use reth_primitives::{BlockWithSenders, Receipt, Request};
use reth_prune_types::PruneModes; use reth_prune_types::PruneModes;
use reth_revm::batch::BlockBatchRecord;
use revm::{db::BundleState, State}; use revm::{db::BundleState, State};
use revm_primitives::db::Database; use revm_primitives::{db::Database, U256};
use crate::system_calls::OnStateHook;
/// A general purpose executor trait that executes an input (e.g. block) and produces an output /// A general purpose executor trait that executes an input (e.g. block) and produces an output
/// (e.g. state changes and receipts). /// (e.g. state changes and receipts).
@ -170,25 +172,49 @@ pub trait BlockExecutionStrategy<DB> {
type Error: From<ProviderError> + core::error::Error; type Error: From<ProviderError> + core::error::Error;
/// Applies any necessary changes before executing the block's transactions. /// Applies any necessary changes before executing the block's transactions.
fn apply_pre_execution_changes(&mut self) -> Result<(), Self::Error>; fn apply_pre_execution_changes(
&mut self,
block: &BlockWithSenders,
total_difficulty: U256,
) -> Result<(), Self::Error>;
/// Executes all transactions in the block. /// Executes all transactions in the block.
fn execute_transactions( fn execute_transactions(
&mut self, &mut self,
block: &BlockWithSenders, block: &BlockWithSenders,
total_difficulty: U256,
) -> Result<(Vec<Receipt>, u64), Self::Error>; ) -> Result<(Vec<Receipt>, u64), Self::Error>;
/// Applies any necessary changes after executing the block's transactions. /// Applies any necessary changes after executing the block's transactions.
fn apply_post_execution_changes(&mut self) -> Result<Vec<Request>, Self::Error>; fn apply_post_execution_changes(
&mut self,
block: &BlockWithSenders,
total_difficulty: U256,
receipts: &[Receipt],
) -> Result<Vec<Request>, Self::Error>;
/// Returns a reference to the current state. /// Returns a reference to the current state.
fn state_ref(&self) -> &State<DB>; fn state_ref(&self) -> &State<DB>;
/// Returns a mutable reference to the current state.
fn state_mut(&mut self) -> &mut State<DB>;
/// Sets a hook to be called after each state change during execution. /// Sets a hook to be called after each state change during execution.
fn with_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>); fn with_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>);
/// Returns the final bundle state. /// Returns the final bundle state.
fn finish(&self) -> BundleState; fn finish(&mut self) -> BundleState;
/// Returns the strategy chain spec.
fn chain_spec(&self) -> Arc<ChainSpec>;
/// Validate a block with regard to execution results.
fn validate_block_post_execution(
&self,
block: &BlockWithSenders,
receipts: &[Receipt],
requests: &[Request],
) -> Result<(), ConsensusError>;
} }
/// A strategy factory that can create block execution strategies. /// A strategy factory that can create block execution strategies.
@ -250,7 +276,8 @@ where
DB: Database<Error: Into<ProviderError> + Display>, DB: Database<Error: Into<ProviderError> + Display>,
{ {
let strategy = self.strategy_factory.create_strategy(db); let strategy = self.strategy_factory.create_strategy(db);
GenericBatchExecutor::new(strategy) let batch_record = BlockBatchRecord::default();
GenericBatchExecutor::new(strategy, batch_record)
} }
} }
@ -261,7 +288,8 @@ pub struct GenericBlockExecutor<S, DB>
where where
S: BlockExecutionStrategy<DB>, S: BlockExecutionStrategy<DB>,
{ {
strategy: S, /// Block execution strategy.
pub(crate) strategy: S,
_phantom: PhantomData<DB>, _phantom: PhantomData<DB>,
} }
@ -285,11 +313,12 @@ where
type Error = S::Error; type Error = S::Error;
fn execute(mut self, input: Self::Input<'_>) -> Result<Self::Output, Self::Error> { fn execute(mut self, input: Self::Input<'_>) -> Result<Self::Output, Self::Error> {
let BlockExecutionInput { block, total_difficulty: _ } = input; let BlockExecutionInput { block, total_difficulty } = input;
self.strategy.apply_pre_execution_changes()?; self.strategy.apply_pre_execution_changes(block, total_difficulty)?;
let (receipts, gas_used) = self.strategy.execute_transactions(block)?; let (receipts, gas_used) = self.strategy.execute_transactions(block, total_difficulty)?;
let requests = self.strategy.apply_post_execution_changes()?; let requests =
self.strategy.apply_post_execution_changes(block, total_difficulty, &receipts)?;
let state = self.strategy.finish(); let state = self.strategy.finish();
Ok(BlockExecutionOutput { state, receipts, requests, gas_used }) Ok(BlockExecutionOutput { state, receipts, requests, gas_used })
@ -303,11 +332,12 @@ where
where where
F: FnMut(&State<DB>), F: FnMut(&State<DB>),
{ {
let BlockExecutionInput { block, total_difficulty: _ } = input; let BlockExecutionInput { block, total_difficulty } = input;
self.strategy.apply_pre_execution_changes()?; self.strategy.apply_pre_execution_changes(block, total_difficulty)?;
let (receipts, gas_used) = self.strategy.execute_transactions(block)?; let (receipts, gas_used) = self.strategy.execute_transactions(block, total_difficulty)?;
let requests = self.strategy.apply_post_execution_changes()?; let requests =
self.strategy.apply_post_execution_changes(block, total_difficulty, &receipts)?;
state(self.strategy.state_ref()); state(self.strategy.state_ref());
@ -324,13 +354,14 @@ where
where where
H: OnStateHook + 'static, H: OnStateHook + 'static,
{ {
let BlockExecutionInput { block, total_difficulty: _ } = input; let BlockExecutionInput { block, total_difficulty } = input;
self.strategy.with_state_hook(Some(Box::new(state_hook))); self.strategy.with_state_hook(Some(Box::new(state_hook)));
self.strategy.apply_pre_execution_changes()?; self.strategy.apply_pre_execution_changes(block, total_difficulty)?;
let (receipts, gas_used) = self.strategy.execute_transactions(block)?; let (receipts, gas_used) = self.strategy.execute_transactions(block, total_difficulty)?;
let requests = self.strategy.apply_post_execution_changes()?; let requests =
self.strategy.apply_post_execution_changes(block, total_difficulty, &receipts)?;
let state = self.strategy.finish(); let state = self.strategy.finish();
@ -341,15 +372,24 @@ where
/// A generic batch executor that uses a [`BlockExecutionStrategy`] to /// A generic batch executor that uses a [`BlockExecutionStrategy`] to
/// execute batches. /// execute batches.
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct GenericBatchExecutor<S, DB> { pub struct GenericBatchExecutor<S, DB>
_strategy: S, where
S: BlockExecutionStrategy<DB>,
{
/// Batch execution strategy.
pub(crate) strategy: S,
/// Keeps track of batch execution receipts and requests.
batch_record: BlockBatchRecord,
_phantom: PhantomData<DB>, _phantom: PhantomData<DB>,
} }
impl<S, DB> GenericBatchExecutor<S, DB> { impl<S, DB> GenericBatchExecutor<S, DB>
where
S: BlockExecutionStrategy<DB>,
{
/// Creates a new `GenericBatchExecutor` with the given strategy. /// Creates a new `GenericBatchExecutor` with the given strategy.
pub const fn new(_strategy: S) -> Self { pub const fn new(strategy: S, batch_record: BlockBatchRecord) -> Self {
Self { _strategy, _phantom: PhantomData } Self { strategy, batch_record, _phantom: PhantomData }
} }
} }
@ -362,24 +402,52 @@ where
type Output = ExecutionOutcome; type Output = ExecutionOutcome;
type Error = BlockExecutionError; type Error = BlockExecutionError;
fn execute_and_verify_one(&mut self, _input: Self::Input<'_>) -> Result<(), Self::Error> { fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error> {
todo!() let BlockExecutionInput { block, total_difficulty } = input;
if self.batch_record.first_block().is_none() {
self.batch_record.set_first_block(block.number);
}
self.strategy.apply_pre_execution_changes(block, total_difficulty)?;
let (receipts, _gas_used) = self.strategy.execute_transactions(block, total_difficulty)?;
let requests =
self.strategy.apply_post_execution_changes(block, total_difficulty, &receipts)?;
self.strategy.validate_block_post_execution(block, &receipts, &requests)?;
// prepare the state according to the prune mode
let retention = self.batch_record.bundle_retention(block.number);
self.strategy.state_mut().merge_transitions(retention);
// store receipts in the set
self.batch_record.save_receipts(receipts)?;
// store requests in the set
self.batch_record.save_requests(requests);
Ok(())
} }
fn finalize(self) -> Self::Output { fn finalize(mut self) -> Self::Output {
todo!() ExecutionOutcome::new(
self.strategy.state_mut().take_bundle(),
self.batch_record.take_receipts(),
self.batch_record.first_block().unwrap_or_default(),
self.batch_record.take_requests(),
)
} }
fn set_tip(&mut self, _tip: BlockNumber) { fn set_tip(&mut self, tip: BlockNumber) {
todo!() self.batch_record.set_tip(tip);
} }
fn set_prune_modes(&mut self, _prune_modes: PruneModes) { fn set_prune_modes(&mut self, prune_modes: PruneModes) {
todo!() self.batch_record.set_prune_modes(prune_modes);
} }
fn size_hint(&self) -> Option<usize> { fn size_hint(&self) -> Option<usize> {
None Some(self.strategy.state_ref().bundle_state.size_hint())
} }
} }
@ -522,18 +590,28 @@ mod tests {
impl<DB> BlockExecutionStrategy<DB> for TestExecutorStrategy<DB, TestEvmConfig> { impl<DB> BlockExecutionStrategy<DB> for TestExecutorStrategy<DB, TestEvmConfig> {
type Error = BlockExecutionError; type Error = BlockExecutionError;
fn apply_pre_execution_changes(&mut self) -> Result<(), Self::Error> { fn apply_pre_execution_changes(
&mut self,
_block: &BlockWithSenders,
_total_difficulty: U256,
) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
fn execute_transactions( fn execute_transactions(
&mut self, &mut self,
_block: &BlockWithSenders, _block: &BlockWithSenders,
_total_difficulty: U256,
) -> Result<(Vec<Receipt>, u64), Self::Error> { ) -> Result<(Vec<Receipt>, u64), Self::Error> {
Ok(self.execute_transactions_result.clone()) Ok(self.execute_transactions_result.clone())
} }
fn apply_post_execution_changes(&mut self) -> Result<Vec<Request>, Self::Error> { fn apply_post_execution_changes(
&mut self,
_block: &BlockWithSenders,
_total_difficulty: U256,
_receipts: &[Receipt],
) -> Result<Vec<Request>, Self::Error> {
Ok(self.apply_post_execution_changes_result.clone()) Ok(self.apply_post_execution_changes_result.clone())
} }
@ -541,11 +619,28 @@ mod tests {
&self.state &self.state
} }
fn state_mut(&mut self) -> &mut State<DB> {
&mut self.state
}
fn with_state_hook(&mut self, _hook: Option<Box<dyn OnStateHook>>) {} fn with_state_hook(&mut self, _hook: Option<Box<dyn OnStateHook>>) {}
fn finish(&self) -> BundleState { fn finish(&mut self) -> BundleState {
self.finish_result.clone() self.finish_result.clone()
} }
fn chain_spec(&self) -> Arc<ChainSpec> {
MAINNET.clone()
}
fn validate_block_post_execution(
&self,
_block: &BlockWithSenders,
_receipts: &[Receipt],
_requests: &[Request],
) -> Result<(), ConsensusError> {
Ok(())
}
} }
#[derive(Clone)] #[derive(Clone)]

View File

@ -2,7 +2,8 @@
use crate::{ use crate::{
execute::{ execute::{
BatchExecutor, BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, BatchExecutor, BlockExecutionInput, BlockExecutionOutput, BlockExecutionStrategy,
BlockExecutorProvider, Executor, GenericBatchExecutor, GenericBlockExecutor,
}, },
system_calls::OnStateHook, system_calls::OnStateHook,
}; };
@ -110,3 +111,45 @@ impl<DB> BatchExecutor<DB> for MockExecutorProvider {
None None
} }
} }
impl<S, DB> GenericBlockExecutor<S, DB>
where
S: BlockExecutionStrategy<DB>,
{
/// Provides safe read access to the state
pub fn with_state<F, R>(&self, f: F) -> R
where
F: FnOnce(&State<DB>) -> R,
{
f(self.strategy.state_ref())
}
/// Provides safe write access to the state
pub fn with_state_mut<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut State<DB>) -> R,
{
f(self.strategy.state_mut())
}
}
impl<S, DB> GenericBatchExecutor<S, DB>
where
S: BlockExecutionStrategy<DB>,
{
/// Provides safe read access to the state
pub fn with_state<F, R>(&self, f: F) -> R
where
F: FnOnce(&State<DB>) -> R,
{
f(self.strategy.state_ref())
}
/// Provides safe write access to the state
pub fn with_state_mut<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut State<DB>) -> R,
{
f(self.strategy.state_mut())
}
}