refactor: use BlockWithSenders in executors (#5771)

This commit is contained in:
Bjerg
2023-12-15 14:57:41 +02:00
committed by GitHub
parent 1eaa3ed5a5
commit faa9a22a71
12 changed files with 183 additions and 186 deletions

View File

@ -269,11 +269,8 @@ impl Command {
let executor_factory = EvmProcessorFactory::new(self.chain.clone());
let mut executor = executor_factory.with_state(blockchain_db.latest()?);
executor.execute_and_verify_receipt(
&block_with_senders.block.clone().unseal(),
U256::MAX,
None,
)?;
executor
.execute_and_verify_receipt(&block_with_senders.clone().unseal(), U256::MAX)?;
let state = executor.take_output_state();
debug!(target: "reth::cli", ?state, "Executed block");

View File

@ -13,6 +13,7 @@ use backon::{ConstantBuilder, Retryable};
use clap::Parser;
use reth_config::Config;
use reth_db::{init_db, DatabaseEnv};
use reth_interfaces::executor::BlockValidationError;
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, ChainSpec};
@ -166,9 +167,12 @@ impl Command {
let merkle_block_td =
provider.header_td_by_number(merkle_block_number)?.unwrap_or_default();
executor.execute_and_verify_receipt(
&block.clone().unseal(),
&block
.clone()
.unseal()
.with_recovered_senders()
.ok_or(BlockValidationError::SenderRecoveryError)?,
merkle_block_td + block.difficulty,
None,
)?;
let block_state = executor.take_output_state();
@ -185,7 +189,7 @@ impl Command {
if in_memory_state_root == block.state_root {
info!(target: "reth::cli", state_root = ?in_memory_state_root, "Computed in-memory state root matches");
return Ok(())
return Ok(());
}
let provider_rw = factory.provider_rw()?;

View File

@ -202,9 +202,6 @@ impl AppendableChain {
// some checks are done before blocks comes here.
externals.consensus.validate_header_against_parent(&block, parent_block)?;
let (block, senders) = block.into_components();
let block = block.unseal();
// get the state provider.
let canonical_fork = bundle_state_data_provider.canonical_fork();
let state_provider =
@ -213,7 +210,8 @@ impl AppendableChain {
let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);
let mut executor = externals.executor_factory.with_state(&provider);
executor.execute_and_verify_receipt(&block, U256::MAX, Some(senders))?;
let block = block.unseal();
executor.execute_and_verify_receipt(&block, U256::MAX)?;
let bundle_state = executor.take_output_state();
// check state root if the block extends the canonical chain __and__ if state root
@ -225,7 +223,7 @@ impl AppendableChain {
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.state_root }.into(),
)
.into())
.into());
}
}

View File

@ -23,8 +23,8 @@ use reth_interfaces::{
};
use reth_primitives::{
constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, ETHEREUM_BLOCK_GAS_LIMIT},
proofs, Address, Block, BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Bloom, ChainSpec,
Header, ReceiptWithBloom, SealedBlock, SealedHeader, TransactionSigned, B256,
proofs, Block, BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, Bloom,
ChainSpec, Header, ReceiptWithBloom, SealedBlock, SealedHeader, TransactionSigned, B256,
EMPTY_OMMER_ROOT_HASH, U256,
};
use reth_provider::{
@ -299,9 +299,8 @@ impl StorageInner {
/// This returns the poststate from execution and post-block changes, as well as the gas used.
pub(crate) fn execute(
&mut self,
block: &Block,
block: &BlockWithSenders,
executor: &mut EVMProcessor<'_>,
senders: Vec<Address>,
) -> Result<(BundleStateWithReceipts, u64), BlockExecutionError> {
trace!(target: "consensus::auto", transactions=?&block.body, "executing transactions");
// TODO: there isn't really a parent beacon block root here, so not sure whether or not to
@ -310,8 +309,7 @@ impl StorageInner {
// set the first block to find the correct index in bundle state
executor.set_first_block(block.number);
let (receipts, gas_used) =
executor.execute_transactions(block, U256::ZERO, Some(senders))?;
let (receipts, gas_used) = executor.execute_transactions(block, U256::ZERO)?;
// Save receipts.
executor.save_receipts(receipts)?;
@ -379,9 +377,8 @@ impl StorageInner {
) -> Result<(SealedHeader, BundleStateWithReceipts), BlockExecutionError> {
let header = self.build_header_template(&transactions, chain_spec.clone());
let block = Block { header, body: transactions, ommers: vec![], withdrawals: None };
let senders = TransactionSigned::recover_signers(&block.body, block.body.len())
let block = Block { header, body: transactions, ommers: vec![], withdrawals: None }
.with_recovered_senders()
.ok_or(BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError))?;
trace!(target: "consensus::auto", transactions=?&block.body, "executing transactions");
@ -393,9 +390,9 @@ impl StorageInner {
.build();
let mut executor = EVMProcessor::new_with_state(chain_spec.clone(), db);
let (bundle_state, gas_used) = self.execute(&block, &mut executor, senders)?;
let (bundle_state, gas_used) = self.execute(&block, &mut executor)?;
let Block { header, body, .. } = block;
let Block { header, body, .. } = block.block;
let body = BlockBody { transactions: body, ommers: vec![], withdrawals: None };
trace!(target: "consensus::auto", ?bundle_state, ?header, ?body, "executed block, calculating state root and completing header");

View File

@ -88,7 +88,7 @@ impl<DB> TestEnv<DB> {
loop {
let result = self.send_new_payload(payload.clone(), cancun_fields.clone()).await?;
if !result.is_syncing() {
return Ok(result)
return Ok(result);
}
}
}
@ -109,7 +109,7 @@ impl<DB> TestEnv<DB> {
loop {
let result = self.engine_handle.fork_choice_updated(state, None).await?;
if !result.is_syncing() {
return Ok(result)
return Ok(result);
}
}
}
@ -182,45 +182,34 @@ where
{
fn execute(
&mut self,
block: &reth_primitives::Block,
block: &reth_primitives::BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<reth_primitives::Address>>,
) -> Result<(), BlockExecutionError> {
match self {
EitherBlockExecutor::Left(a) => a.execute(block, total_difficulty, senders),
EitherBlockExecutor::Right(b) => b.execute(block, total_difficulty, senders),
EitherBlockExecutor::Left(a) => a.execute(block, total_difficulty),
EitherBlockExecutor::Right(b) => b.execute(block, total_difficulty),
}
}
fn execute_and_verify_receipt(
&mut self,
block: &reth_primitives::Block,
block: &reth_primitives::BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<reth_primitives::Address>>,
) -> Result<(), BlockExecutionError> {
match self {
EitherBlockExecutor::Left(a) => {
a.execute_and_verify_receipt(block, total_difficulty, senders)
}
EitherBlockExecutor::Right(b) => {
b.execute_and_verify_receipt(block, total_difficulty, senders)
}
EitherBlockExecutor::Left(a) => a.execute_and_verify_receipt(block, total_difficulty),
EitherBlockExecutor::Right(b) => b.execute_and_verify_receipt(block, total_difficulty),
}
}
fn execute_transactions(
&mut self,
block: &reth_primitives::Block,
block: &reth_primitives::BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<reth_primitives::Address>>,
) -> Result<(Vec<Receipt>, u64), BlockExecutionError> {
match self {
EitherBlockExecutor::Left(a) => {
a.execute_transactions(block, total_difficulty, senders)
}
EitherBlockExecutor::Right(b) => {
b.execute_transactions(block, total_difficulty, senders)
}
EitherBlockExecutor::Left(a) => a.execute_transactions(block, total_difficulty),
EitherBlockExecutor::Right(b) => b.execute_transactions(block, total_difficulty),
}
}

View File

@ -53,6 +53,11 @@ impl Block {
}
}
/// Expensive operation that recovers transaction signer. See [SealedBlockWithSenders].
pub fn senders(&self) -> Option<Vec<Address>> {
TransactionSigned::recover_signers(&self.body, self.body.len())
}
/// Transform into a [`BlockWithSenders`].
///
/// # Panics
@ -71,6 +76,15 @@ impl Block {
BlockWithSenders { block: self, senders }
}
/// **Expensive**. Transform into a [`BlockWithSenders`] by recovering senders in the contained
/// transactions.
///
/// Returns `None` if a transaction is invalid.
pub fn with_recovered_senders(self) -> Option<BlockWithSenders> {
let senders = self.senders()?;
Some(BlockWithSenders { block: self, senders })
}
/// Returns whether or not the block contains any blob transactions.
pub fn has_blob_transactions(&self) -> bool {
self.body.iter().any(|tx| tx.is_eip4844())

View File

@ -3,7 +3,7 @@ use reth_interfaces::executor::{
BlockExecutionError, BlockValidationError, OptimismBlockExecutionError,
};
use reth_primitives::{
revm::compat::into_reth_log, revm_primitives::ResultAndState, Address, Block, Hardfork,
revm::compat::into_reth_log, revm_primitives::ResultAndState, BlockWithSenders, Hardfork,
Receipt, U256,
};
use reth_provider::{BlockExecutor, BlockExecutorStats, BundleStateWithReceipts};
@ -14,22 +14,20 @@ use tracing::{debug, trace};
impl<'a> BlockExecutor for EVMProcessor<'a> {
fn execute(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError> {
let receipts = self.execute_inner(block, total_difficulty, senders)?;
let receipts = self.execute_inner(block, total_difficulty)?;
self.save_receipts(receipts)
}
fn execute_and_verify_receipt(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError> {
// execute block
let receipts = self.execute_inner(block, total_difficulty, senders)?;
let receipts = self.execute_inner(block, total_difficulty)?;
// TODO Before Byzantium, receipts contained state root that would mean that expensive
// operation as hashing that is needed for state root got calculated in every
@ -45,7 +43,7 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
block.timestamp,
) {
debug!(target: "evm", ?error, ?receipts, "receipts verification failed");
return Err(error)
return Err(error);
};
self.stats.receipt_root_duration += time.elapsed();
}
@ -55,19 +53,16 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
fn execute_transactions(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(Vec<Receipt>, u64), BlockExecutionError> {
self.init_env(&block.header, total_difficulty);
// perf: do not execute empty blocks
if block.body.is_empty() {
return Ok((Vec::new(), 0))
return Ok((Vec::new(), 0));
}
let senders = self.recover_senders(&block.body, senders)?;
let is_regolith =
self.chain_spec.fork(Hardfork::Regolith).active_at_timestamp(block.timestamp);
@ -84,7 +79,7 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
let mut cumulative_gas_used = 0;
let mut receipts = Vec::with_capacity(block.body.len());
for (transaction, sender) in block.body.iter().zip(senders) {
for (sender, transaction) in block.transactions_with_sender() {
let time = Instant::now();
// The sum of the transactions gas limit, Tg, and the gas utilized in this block prior,
// must be no greater than the blocks gasLimit.
@ -96,7 +91,7 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
transaction_gas_limit: transaction.gas_limit(),
block_available_gas,
}
.into())
.into());
}
// Cache the depositor account prior to the state transition for the deposit nonce.
@ -107,14 +102,14 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
let depositor = (is_regolith && transaction.is_deposit())
.then(|| {
self.db_mut()
.load_cache_account(sender)
.load_cache_account(*sender)
.map(|acc| acc.account_info().unwrap_or_default())
})
.transpose()
.map_err(|_| BlockExecutionError::ProviderError)?;
// Execute transaction.
let ResultAndState { result, state } = self.transact(transaction, sender)?;
let ResultAndState { result, state } = self.transact(transaction, *sender)?;
trace!(
target: "evm",
?transaction, ?result, ?state,

View File

@ -7,9 +7,9 @@ use crate::{
use reth_interfaces::executor::{BlockExecutionError, BlockValidationError};
use reth_primitives::{
revm::env::{fill_cfg_and_block_env, fill_tx_env},
Address, Block, BlockNumber, Bloom, ChainSpec, GotExpected, Hardfork, Header, PruneMode,
PruneModes, PruneSegmentError, Receipt, ReceiptWithBloom, Receipts, TransactionSigned, B256,
MINIMUM_PRUNING_DISTANCE, U256,
Address, Block, BlockNumber, BlockWithSenders, Bloom, ChainSpec, GotExpected, Hardfork, Header,
PruneMode, PruneModes, PruneSegmentError, Receipt, ReceiptWithBloom, Receipts,
TransactionSigned, B256, MINIMUM_PRUNING_DISTANCE, U256,
};
use reth_provider::{
BlockExecutor, BlockExecutorStats, ProviderError, PrunableBlockExecutor, StateProvider,
@ -149,26 +149,6 @@ impl<'a> EVMProcessor<'a> {
self.evm.db().expect("Database inside EVM is always set")
}
pub(crate) fn recover_senders(
&mut self,
body: &[TransactionSigned],
senders: Option<Vec<Address>>,
) -> Result<Vec<Address>, BlockExecutionError> {
if let Some(senders) = senders {
if body.len() == senders.len() {
Ok(senders)
} else {
Err(BlockValidationError::SenderRecoveryError.into())
}
} else {
let time = Instant::now();
let ret = TransactionSigned::recover_signers(body, body.len())
.ok_or(BlockValidationError::SenderRecoveryError.into());
self.stats.sender_recovery_duration += time.elapsed();
ret
}
}
/// Initializes the config and block env.
pub(crate) fn init_env(&mut self, header: &Header, total_difficulty: U256) {
// Set state clear flag.
@ -283,14 +263,12 @@ impl<'a> EVMProcessor<'a> {
/// Execute the block, verify gas usage and apply post-block state changes.
pub(crate) fn execute_inner(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<Vec<Receipt>, BlockExecutionError> {
self.init_env(&block.header, total_difficulty);
self.apply_beacon_root_contract_call(block)?;
let (receipts, cumulative_gas_used) =
self.execute_transactions(block, total_difficulty, senders)?;
let (receipts, cumulative_gas_used) = self.execute_transactions(block, total_difficulty)?;
// Check if gas used matches the value set in header.
if block.gas_used != cumulative_gas_used {
@ -299,7 +277,7 @@ impl<'a> EVMProcessor<'a> {
gas: GotExpected { got: cumulative_gas_used, expected: block.gas_used },
gas_spent_by_tx: receipts.gas_spent_by_tx()?,
}
.into())
.into());
}
let time = Instant::now();
self.apply_post_execution_state_change(block, total_difficulty)?;
@ -358,7 +336,7 @@ impl<'a> EVMProcessor<'a> {
self.prune_modes.receipts.map_or(false, |mode| mode.should_prune(block_number, tip))
{
receipts.clear();
return Ok(())
return Ok(());
}
// All receipts from the last 128 blocks are required for blockchain tree, even with
@ -366,7 +344,7 @@ impl<'a> EVMProcessor<'a> {
let prunable_receipts =
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(block_number, tip);
if !prunable_receipts {
return Ok(())
return Ok(());
}
let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
@ -399,22 +377,20 @@ impl<'a> EVMProcessor<'a> {
impl<'a> BlockExecutor for EVMProcessor<'a> {
fn execute(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError> {
let receipts = self.execute_inner(block, total_difficulty, senders)?;
let receipts = self.execute_inner(block, total_difficulty)?;
self.save_receipts(receipts)
}
fn execute_and_verify_receipt(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError> {
// execute block
let receipts = self.execute_inner(block, total_difficulty, senders)?;
let receipts = self.execute_inner(block, total_difficulty)?;
// TODO Before Byzantium, receipts contained state root that would mean that expensive
// operation as hashing that is needed for state root got calculated in every
@ -426,7 +402,7 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
verify_receipt(block.header.receipts_root, block.header.logs_bloom, receipts.iter())
{
debug!(target: "evm", ?error, ?receipts, "receipts verification failed");
return Err(error)
return Err(error);
};
self.stats.receipt_root_duration += time.elapsed();
}
@ -436,22 +412,19 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
fn execute_transactions(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(Vec<Receipt>, u64), BlockExecutionError> {
self.init_env(&block.header, total_difficulty);
// perf: do not execute empty blocks
if block.body.is_empty() {
return Ok((Vec::new(), 0))
return Ok((Vec::new(), 0));
}
let senders = self.recover_senders(&block.body, senders)?;
let mut cumulative_gas_used = 0;
let mut receipts = Vec::with_capacity(block.body.len());
for (transaction, sender) in block.body.iter().zip(senders) {
for (sender, transaction) in block.transactions_with_sender() {
let time = Instant::now();
// The sum of the transactions gas limit, Tg, and the gas utilized in this block prior,
// must be no greater than the blocks gasLimit.
@ -461,10 +434,10 @@ impl<'a> BlockExecutor for EVMProcessor<'a> {
transaction_gas_limit: transaction.gas_limit(),
block_available_gas,
}
.into())
.into());
}
// Execute transaction.
let ResultAndState { result, state } = self.transact(transaction, sender)?;
let ResultAndState { result, state } = self.transact(transaction, *sender)?;
trace!(
target: "evm",
?transaction, ?result, ?state,
@ -546,7 +519,7 @@ pub fn verify_receipt<'a>(
return Err(BlockValidationError::ReceiptRootDiff(
GotExpected { got: receipts_root, expected: expected_receipts_root }.into(),
)
.into())
.into());
}
// Create header log bloom.
@ -555,7 +528,7 @@ pub fn verify_receipt<'a>(
return Err(BlockValidationError::BloomLogDiff(
GotExpected { got: logs_bloom, expected: expected_logs_bloom }.into(),
)
.into())
.into());
}
Ok(())
@ -698,9 +671,16 @@ mod tests {
// attempt to execute a block without parent beacon block root, expect err
let err = executor
.execute_and_verify_receipt(
&Block { header: header.clone(), body: vec![], ommers: vec![], withdrawals: None },
&BlockWithSenders {
block: Block {
header: header.clone(),
body: vec![],
ommers: vec![],
withdrawals: None,
},
senders: vec![],
},
U256::ZERO,
None,
)
.expect_err(
"Executing cancun block without parent beacon block root field should fail",
@ -716,9 +696,16 @@ mod tests {
// Now execute a block with the fixed header, ensure that it does not fail
executor
.execute(
&Block { header: header.clone(), body: vec![], ommers: vec![], withdrawals: None },
&BlockWithSenders {
block: Block {
header: header.clone(),
body: vec![],
ommers: vec![],
withdrawals: None,
},
senders: vec![],
},
U256::ZERO,
None,
)
.unwrap();
@ -776,9 +763,16 @@ mod tests {
// attempt to execute an empty block with parent beacon block root, this should not fail
executor
.execute_and_verify_receipt(
&Block { header: header.clone(), body: vec![], ommers: vec![], withdrawals: None },
&BlockWithSenders {
block: Block {
header: header.clone(),
body: vec![],
ommers: vec![],
withdrawals: None,
},
senders: vec![],
},
U256::ZERO,
None,
)
.expect(
"Executing a block with no transactions while cancun is active should not fail",
@ -833,9 +827,16 @@ mod tests {
// attempt to execute an empty block with parent beacon block root, this should not fail
executor
.execute_and_verify_receipt(
&Block { header: header.clone(), body: vec![], ommers: vec![], withdrawals: None },
&BlockWithSenders {
block: Block {
header: header.clone(),
body: vec![],
ommers: vec![],
withdrawals: None,
},
senders: vec![],
},
U256::ZERO,
None,
)
.expect(
"Executing a block with no transactions while cancun is active should not fail",
@ -880,9 +881,16 @@ mod tests {
header.parent_beacon_block_root = Some(B256::with_last_byte(0x69));
let _err = executor
.execute_and_verify_receipt(
&Block { header: header.clone(), body: vec![], ommers: vec![], withdrawals: None },
&BlockWithSenders {
block: Block {
header: header.clone(),
body: vec![],
ommers: vec![],
withdrawals: None,
},
senders: vec![],
},
U256::ZERO,
None,
)
.expect_err(
"Executing genesis cancun block with non-zero parent beacon block root field should fail",
@ -895,9 +903,16 @@ mod tests {
// call does not occur
executor
.execute(
&Block { header: header.clone(), body: vec![], ommers: vec![], withdrawals: None },
&BlockWithSenders {
block: Block {
header: header.clone(),
body: vec![],
ommers: vec![],
withdrawals: None,
},
senders: vec![],
},
U256::ZERO,
None,
)
.unwrap();
@ -958,9 +973,16 @@ mod tests {
// Now execute a block with the fixed header, ensure that it does not fail
executor
.execute(
&Block { header: header.clone(), body: vec![], ommers: vec![], withdrawals: None },
&BlockWithSenders {
block: Block {
header: header.clone(),
body: vec![],
ommers: vec![],
withdrawals: None,
},
senders: vec![],
},
U256::ZERO,
None,
)
.unwrap();

View File

@ -114,7 +114,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
input: ExecInput,
) -> Result<ExecOutput, StageError> {
if input.target_reached() {
return Ok(ExecOutput::done(input.checkpoint()))
return Ok(ExecOutput::done(input.checkpoint()));
}
let start_block = input.next_block();
@ -159,12 +159,9 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let time = Instant::now();
// Execute the block
let (block, senders) = block.into_components();
executor.execute_and_verify_receipt(&block, td, Some(senders)).map_err(|error| {
StageError::Block {
block: Box::new(block.header.clone().seal_slow()),
error: BlockErrorKind::Execution(error),
}
executor.execute_and_verify_receipt(&block, td).map_err(|error| StageError::Block {
block: Box::new(block.header.clone().seal_slow()),
error: BlockErrorKind::Execution(error),
})?;
execution_duration += time.elapsed();
@ -186,7 +183,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
bundle_size_hint,
cumulative_gas,
) {
break
break;
}
}
let time = Instant::now();
@ -363,7 +360,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
if range.is_empty() {
return Ok(UnwindOutput {
checkpoint: input.checkpoint.with_block_number(input.unwind_to),
})
});
}
// get all batches for account change
@ -406,7 +403,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?;
while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? {
if key.block_number() < *range.start() {
break
break;
}
// delete all changesets
rev_storage_changeset_walker.delete_current()?;
@ -426,7 +423,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
while let Some(Ok((tx_number, receipt))) = reverse_walker.next() {
if tx_number < first_tx_num {
break
break;
}
reverse_walker.delete_current()?;

View File

@ -139,7 +139,7 @@ where
while let Some((sharded_key, list)) = item {
// If the shard does not belong to the key, break.
if !shard_belongs_to_key(&sharded_key) {
break
break;
}
cursor.delete_current()?;
@ -148,12 +148,12 @@ where
let first = list.iter(0).next().expect("List can't be empty");
if first >= block_number as usize {
item = cursor.prev()?;
continue
continue;
} else if block_number <= sharded_key.as_ref().highest_block_number {
// Filter out all elements greater than block number.
return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::<Vec<_>>())
return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::<Vec<_>>());
} else {
return Ok(list.iter(0).collect::<Vec<_>>())
return Ok(list.iter(0).collect::<Vec<_>>());
}
}
@ -236,7 +236,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BundleStateWithReceipts> {
if range.is_empty() {
return Ok(BundleStateWithReceipts::default())
return Ok(BundleStateWithReceipts::default());
}
let start_block_number = *range.start();
@ -414,7 +414,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
let block_bodies = self.get_or_take::<tables::BlockBodyIndices, false>(range)?;
if block_bodies.is_empty() {
return Ok(Vec::new())
return Ok(Vec::new());
}
// Compute the first and last tx ID in the range
@ -423,7 +423,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
// If this is the case then all of the blocks in the range are empty
if last_transaction < first_transaction {
return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect())
return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect());
}
// Get transactions and senders
@ -552,7 +552,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
let block_headers = self.get_or_take::<tables::Headers, TAKE>(range.clone())?;
if block_headers.is_empty() {
return Ok(Vec::new())
return Ok(Vec::new());
}
let block_header_hashes =
@ -658,7 +658,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key.clone()) <= key {
break
break;
}
reverse_walker.delete_current()?;
deleted += 1;
@ -705,7 +705,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
}
if deleted == limit {
break
break;
}
}
}
@ -736,7 +736,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
}
if deleted == limit {
break
break;
}
}
}
@ -756,7 +756,7 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
// delete old shard so new one can be inserted.
self.tx.delete::<T>(shard_key, None)?;
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
return Ok(list)
return Ok(list);
}
Ok(Vec::new())
}
@ -948,7 +948,7 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
if let Some(td) = self.chain_spec.final_paris_total_difficulty(number) {
// if this block is higher than the final paris(merge) block, return the final paris
// difficulty
return Ok(Some(td))
return Ok(Some(td));
}
Ok(self.tx.get::<tables::HeaderTD>(number)?.map(|td| td.0))
@ -986,7 +986,7 @@ impl<TX: DbTx> HeaderProvider for DatabaseProvider<TX> {
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
let sealed = header.seal(hash);
if !predicate(&sealed) {
break
break;
}
headers.push(sealed);
}
@ -1064,7 +1064,7 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
None => return Ok(None),
};
return Ok(Some(Block { header, body: transactions, ommers, withdrawals }))
return Ok(Some(Block { header, body: transactions, ommers, withdrawals }));
}
}
@ -1088,11 +1088,11 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
// If the Paris (Merge) hardfork block is known and block is after it, return empty
// ommers.
if self.chain_spec.final_paris_total_difficulty(number).is_some() {
return Ok(Some(Vec::new()))
return Ok(Some(Vec::new()));
}
let ommers = self.tx.get::<tables::BlockOmmers>(number)?.map(|o| o.ommers);
return Ok(ommers)
return Ok(ommers);
}
Ok(None)
@ -1158,7 +1158,7 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
if range.is_empty() {
return Ok(Vec::new())
return Ok(Vec::new());
}
let len = range.end().saturating_sub(*range.start()) as usize;
@ -1336,7 +1336,7 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
excess_blob_gas: header.excess_blob_gas,
};
return Ok(Some((transaction, meta)))
return Ok(Some((transaction, meta)));
}
}
}
@ -1367,7 +1367,7 @@ impl<TX: DbTx> TransactionsProvider for DatabaseProvider<TX> {
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(transactions))
}
};
}
}
Ok(None)
@ -1452,7 +1452,7 @@ impl<TX: DbTx> ReceiptProvider for DatabaseProvider<TX> {
.map(|result| result.map(|(_, receipt)| receipt))
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(receipts))
}
};
}
}
Ok(None)
@ -1474,7 +1474,7 @@ impl<TX: DbTx> WithdrawalsProvider for DatabaseProvider<TX> {
.get::<tables::BlockWithdrawals>(number)
.map(|w| w.map(|w| w.withdrawals))?
.unwrap_or_default();
return Ok(Some(withdrawals))
return Ok(Some(withdrawals));
}
}
Ok(None)
@ -1906,7 +1906,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
root: GotExpected { got: state_root, expected: expected_state_root },
block_number: *range.end(),
block_hash: end_block_hash,
})))
})));
}
trie_updates.flush(&self.tx)?;
}
@ -2121,7 +2121,7 @@ impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
})));
}
trie_updates.flush(&self.tx)?;
}
@ -2305,7 +2305,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "providers::db", "Attempted to append empty block range");
return Ok(())
return Ok(());
}
let first_number = blocks.first().unwrap().number;

View File

@ -4,7 +4,7 @@ use crate::{
};
use parking_lot::Mutex;
use reth_interfaces::executor::BlockExecutionError;
use reth_primitives::{Address, Block, BlockNumber, ChainSpec, PruneModes, Receipt, U256};
use reth_primitives::{BlockNumber, BlockWithSenders, ChainSpec, PruneModes, Receipt, U256};
use std::sync::Arc;
/// Test executor with mocked result.
#[derive(Debug)]
@ -13,33 +13,30 @@ pub struct TestExecutor(pub Option<BundleStateWithReceipts>);
impl BlockExecutor for TestExecutor {
fn execute(
&mut self,
_block: &Block,
_block: &BlockWithSenders,
_total_difficulty: U256,
_senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError> {
if self.0.is_none() {
return Err(BlockExecutionError::UnavailableForTest)
return Err(BlockExecutionError::UnavailableForTest);
}
Ok(())
}
fn execute_and_verify_receipt(
&mut self,
_block: &Block,
_block: &BlockWithSenders,
_total_difficulty: U256,
_senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError> {
if self.0.is_none() {
return Err(BlockExecutionError::UnavailableForTest)
return Err(BlockExecutionError::UnavailableForTest);
}
Ok(())
}
fn execute_transactions(
&mut self,
_block: &Block,
_block: &BlockWithSenders,
_total_difficulty: U256,
_senders: Option<Vec<Address>>,
) -> Result<(Vec<Receipt>, u64), BlockExecutionError> {
Err(BlockExecutionError::UnavailableForTest)
}

View File

@ -2,7 +2,7 @@
use crate::{bundle_state::BundleStateWithReceipts, StateProvider};
use reth_interfaces::executor::BlockExecutionError;
use reth_primitives::{Address, Block, BlockNumber, ChainSpec, PruneModes, Receipt, U256};
use reth_primitives::{BlockNumber, BlockWithSenders, ChainSpec, PruneModes, Receipt, U256};
use std::time::Duration;
use tracing::debug;
@ -23,18 +23,10 @@ pub trait ExecutorFactory: Send + Sync + 'static {
/// An executor capable of executing a block.
pub trait BlockExecutor {
/// Execute a block.
///
/// The number of `senders` should be equal to the number of transactions in the block.
///
/// If no senders are specified, the `execute` function MUST recover the senders for the
/// provided block's transactions internally. We use this to allow for calculating senders in
/// parallel in e.g. staged sync, so that execution can happen without paying for sender
/// recovery costs.
fn execute(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError>;
/// Executes the block and checks receipts.
@ -42,9 +34,8 @@ pub trait BlockExecutor {
/// See [execute](BlockExecutor::execute) for more details.
fn execute_and_verify_receipt(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(), BlockExecutionError>;
/// Runs the provided transactions and commits their state to the run-time database.
@ -61,9 +52,8 @@ pub trait BlockExecutor {
/// See [execute](BlockExecutor::execute) for more details.
fn execute_transactions(
&mut self,
block: &Block,
block: &BlockWithSenders,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<(Vec<Receipt>, u64), BlockExecutionError>;
/// Return bundle state. This is output of executed blocks.
@ -99,8 +89,6 @@ pub struct BlockExecutorStats {
pub merge_transitions_duration: Duration,
/// Time needed to calculate receipt roots.
pub receipt_root_duration: Duration,
/// Time needed to recover senders.
pub sender_recovery_duration: Duration,
}
impl BlockExecutorStats {
@ -113,7 +101,6 @@ impl BlockExecutorStats {
apply_post_state = ?self.apply_post_execution_state_changes_duration,
merge_transitions = ?self.merge_transitions_duration,
receipt_root = ?self.receipt_root_duration,
sender_recovery = ?self.sender_recovery_duration,
"Execution time"
);
}