From a8c883c6b61329c6fe822d79d25cf4be4a740223 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 17 Jan 2025 11:21:05 +0000 Subject: [PATCH] feat: extend `BlockBodyIndicesProvider` with `block_body_indices_range` (#13829) --- crates/stages/stages/src/stages/bodies.rs | 3 +- crates/stages/stages/src/stages/execution.rs | 13 +- crates/stages/stages/src/stages/tx_lookup.rs | 15 +- .../src/providers/blockchain_provider.rs | 14 +- .../provider/src/providers/consistent.rs | 7 + .../provider/src/providers/database/mod.rs | 7 + .../src/providers/database/provider.rs | 130 ++++++++++-------- .../provider/src/providers/static_file/jar.rs | 17 ++- .../src/providers/static_file/manager.rs | 8 ++ .../storage/provider/src/test_utils/mock.rs | 6 + .../storage/storage-api/src/block_indices.rs | 8 ++ crates/storage/storage-api/src/noop.rs | 7 + 12 files changed, 158 insertions(+), 77 deletions(-) diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 7c796ec6a..fd4e373a5 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -112,8 +112,7 @@ impl BodyStage { // fix the inconsistency right away. if let Some(unwind_to) = unwind_block { let next_tx_num_after_unwind = provider - .tx_ref() - .get::(unwind_to)? + .block_body_indices(unwind_to)? .map(|b| b.next_tx_num()) .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?; diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 5557beda5..afa493b4d 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -5,7 +5,6 @@ use alloy_primitives::BlockNumber; use num_traits::Zero; use reth_config::config::ExecutionConfig; use reth_db::{static_file::HeaderMask, tables}; -use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; use reth_evm::{ execute::{BatchExecutor, BlockExecutorProvider}, metrics::ExecutorMetrics, @@ -203,12 +202,8 @@ where } // Get next expected receipt number - let tx = provider.tx_ref(); - let next_receipt_num = tx - .cursor_read::()? - .seek_exact(checkpoint)? - .map(|(_, value)| value.next_tx_num()) - .unwrap_or(0); + let next_receipt_num = + provider.block_body_indices(checkpoint)?.map(|b| b.next_tx_num()).unwrap_or(0); let static_file_provider = provider.static_file_provider(); @@ -237,8 +232,7 @@ where // fix the inconsistency right away. if let Some(unwind_to) = unwind_to { let next_receipt_num_after_unwind = provider - .tx_ref() - .get::(unwind_to)? + .block_body_indices(unwind_to)? .map(|b| b.next_tx_num()) .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?; @@ -645,6 +639,7 @@ mod tests { use alloy_rlp::Decodable; use assert_matches::assert_matches; use reth_chainspec::ChainSpecBuilder; + use reth_db::transaction::DbTx; use reth_db_api::{models::AccountBeforeTx, transaction::DbTxMut}; use reth_evm::execute::BasicBlockExecutorProvider; use reth_evm_ethereum::execute::EthExecutionStrategyFactory; diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 872af3baf..42ffd0542 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -5,7 +5,7 @@ use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db::{table::Value, tables, RawKey, RawValue}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, - transaction::{DbTx, DbTxMut}, + transaction::DbTxMut, }; use reth_etl::Collector; use reth_primitives::NodePrimitives; @@ -195,12 +195,16 @@ where let tx = provider.tx_ref(); let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); - // Cursors to unwind tx hash to number - let mut body_cursor = tx.cursor_read::()?; + // Cursor to unwind tx hash to number let mut tx_hash_number_cursor = tx.cursor_write::()?; let static_file_provider = provider.static_file_provider(); - let mut rev_walker = body_cursor.walk_back(Some(*range.end()))?; - while let Some((number, body)) = rev_walker.next().transpose()? { + let rev_walker = provider + .block_body_indices_range(range.clone())? + .into_iter() + .zip(range.collect::>()) + .rev(); + + for (body, number) in rev_walker { if number <= unwind_to { break; } @@ -255,6 +259,7 @@ mod tests { }; use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; + use reth_db::transaction::DbTx; use reth_primitives::SealedBlock; use reth_provider::{ providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory, diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 75ad402b0..92451fab1 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -472,6 +472,13 @@ impl BlockBodyIndicesProvider for BlockchainProvider { ) -> ProviderResult> { self.consistent_provider()?.block_body_indices(number) } + + fn block_body_indices_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.consistent_provider()?.block_body_indices_range(range) + } } impl StageCheckpointReader for BlockchainProvider { @@ -882,10 +889,9 @@ mod tests { let static_file_provider = factory.static_file_provider(); // Write transactions to static files with the right `tx_num`` - let mut bodies_cursor = provider_rw.tx_ref().cursor_read::()?; - let mut tx_num = bodies_cursor - .seek_exact(database_blocks.first().as_ref().unwrap().number.saturating_sub(1))? - .map(|(_, indices)| indices.next_tx_num()) + let mut tx_num = provider_rw + .block_body_indices(database_blocks.first().as_ref().unwrap().number.saturating_sub(1))? + .map(|indices| indices.next_tx_num()) .unwrap_or_default(); // Insert blocks into the database diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs index d84152f88..3082faf45 100644 --- a/crates/storage/provider/src/providers/consistent.rs +++ b/crates/storage/provider/src/providers/consistent.rs @@ -1204,6 +1204,13 @@ impl BlockBodyIndicesProvider for ConsistentProvider { }, ) } + + fn block_body_indices_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + range.map_while(|b| self.block_body_indices(b).transpose()).collect() + } } impl StageCheckpointReader for ConsistentProvider { diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index ab44da3ae..12e33146c 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -564,6 +564,13 @@ impl BlockBodyIndicesProvider for ProviderFactory { ) -> ProviderResult> { self.provider()?.block_body_indices(number) } + + fn block_body_indices_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.provider()?.block_body_indices_range(range) + } } impl StageCheckpointReader for ProviderFactory { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 4a9e5affa..6ac93454c 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -618,27 +618,23 @@ impl DatabaseProvider { let len = range.end().saturating_sub(*range.start()) as usize; let mut blocks = Vec::with_capacity(len); - let headers = headers_range(range)?; + let headers = headers_range(range.clone())?; let mut tx_cursor = self.tx.cursor_read::>>()?; - let mut block_body_cursor = self.tx.cursor_read::()?; - let mut present_headers = Vec::new(); - for header in headers { - // If the body indices are not found, this means that the transactions either do - // not exist in the database yet, or they do exit but are - // not indexed. If they exist but are not indexed, we don't - // have enough information to return the block anyways, so - // we skip the block. - if let Some((_, block_body_indices)) = - block_body_cursor.seek_exact(header.as_ref().number())? - { - let tx_range = block_body_indices.tx_num_range(); - present_headers.push((header, tx_range)); - } - } + // If the body indices are not found, this means that the transactions either do + // not exist in the database yet, or they do exit but are + // not indexed. If they exist but are not indexed, we don't + // have enough information to return the block anyways, so + // we skip the block. + let present_headers = self + .block_body_indices_range(range)? + .into_iter() + .map(|b| b.tx_num_range()) + .zip(headers) + .collect::>(); let mut inputs = Vec::new(); - for (header, tx_range) in &present_headers { + for (tx_range, header) in &present_headers { let transactions = if tx_range.is_empty() { Vec::new() } else { @@ -650,7 +646,7 @@ impl DatabaseProvider { let bodies = self.storage.reader().read_block_bodies(self, inputs)?; - for ((header, tx_range), body) in present_headers.into_iter().zip(bodies) { + for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) { blocks.push(assemble_block(header, body, tx_range)?); } @@ -1476,23 +1472,23 @@ impl TransactionsProvider for Datab &self, range: impl RangeBounds, ) -> ProviderResult>> { + let range = to_range(range); let mut tx_cursor = self.tx.cursor_read::>()?; - let mut results = Vec::new(); - let mut body_cursor = self.tx.cursor_read::()?; - for entry in body_cursor.walk_range(range)? { - let (_, body) = entry?; - let tx_num_range = body.tx_num_range(); - if tx_num_range.is_empty() { - results.push(Vec::new()); - } else { - results.push( - self.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)? + + self.block_body_indices_range(range.start..=range.end.saturating_sub(1))? + .into_iter() + .map(|body| { + let tx_num_range = body.tx_num_range(); + if tx_num_range.is_empty() { + Ok(Vec::new()) + } else { + Ok(self + .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)? .into_iter() - .collect(), - ); - } - } - Ok(results) + .collect()) + } + }) + .collect() } fn transactions_by_tx_range( @@ -1620,6 +1616,18 @@ impl BlockBodyIndicesProvider fn block_body_indices(&self, num: u64) -> ProviderResult> { Ok(self.tx.get::(num)?) } + + fn block_body_indices_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + Ok(self + .tx_ref() + .cursor_read::()? + .walk_range(range)? + .map(|r| r.map(|(_, b)| b)) + .collect::>()?) + } } impl StageCheckpointReader for DatabaseProvider { @@ -1756,13 +1764,31 @@ impl StateWriter is_value_known: OriginalValuesKnown, write_receipts_to: StorageLocation, ) -> ProviderResult<()> { + let first_block = execution_outcome.first_block(); + let block_count = execution_outcome.receipts.len() as u64; + let block_range = first_block..=first_block.saturating_add(block_count).saturating_sub(1); + let last_block = *block_range.end(); + let (plain_state, reverts) = execution_outcome.bundle.to_plain_state_and_reverts(is_value_known); - self.write_state_reverts(reverts, execution_outcome.first_block)?; + self.write_state_reverts(reverts, first_block)?; self.write_state_changes(plain_state)?; - let mut bodies_cursor = self.tx.cursor_read::()?; + // Fetch the first transaction number for each block in the range + let block_indices: Vec<_> = self + .block_body_indices_range(block_range)? + .into_iter() + .map(|b| b.first_tx_num) + .collect(); + + // Ensure all expected blocks are present. + if block_indices.len() < block_count as usize { + let missing_blocks = block_count - block_indices.len() as u64; + return Err(ProviderError::BlockBodyIndicesNotFound( + last_block.saturating_sub(missing_blocks - 1), + )); + } let has_receipts_pruning = self.prune_modes.has_receipts_pruning() || execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none()); @@ -1780,25 +1806,19 @@ impl StateWriter // We are writing to static files if requested and if there's no receipt pruning configured let mut receipts_static_writer = (write_receipts_to.static_files() && !has_receipts_pruning) - .then(|| { - self.static_file_provider - .get_writer(execution_outcome.first_block, StaticFileSegment::Receipts) - }) + .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)) .transpose()?; - for (idx, receipts) in execution_outcome.receipts.iter().enumerate() { - let block_number = execution_outcome.first_block + idx as u64; + for (idx, (receipts, first_tx_index)) in + execution_outcome.receipts.iter().zip(block_indices).enumerate() + { + let block_number = first_block + idx as u64; // Increment block number for receipts static file writer if let Some(writer) = receipts_static_writer.as_mut() { writer.increment_block(block_number)?; } - let first_tx_index = bodies_cursor - .seek_exact(block_number)? - .map(|(_, indices)| indices.first_tx_num()) - .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?; - for (idx, receipt) in receipts.iter().enumerate() { let receipt_idx = first_tx_index + idx as u64; if let Some(receipt) = receipt { @@ -2017,11 +2037,11 @@ impl StateWriter } // We are not removing block meta as it is used to get block changesets. - let block_bodies = self.get::(range.clone())?; + let block_bodies = self.block_body_indices_range(range.clone())?; // get transaction receipts let from_transaction_num = - block_bodies.first().expect("already checked if there are blocks").1.first_tx_num(); + block_bodies.first().expect("already checked if there are blocks").first_tx_num(); let storage_range = BlockNumberAddress::range(range.clone()); @@ -2113,13 +2133,13 @@ impl StateWriter let start_block_number = *range.start(); // We are not removing block meta as it is used to get block changesets. - let block_bodies = self.get::(range.clone())?; + let block_bodies = self.block_body_indices_range(range.clone())?; // get transaction receipts let from_transaction_num = - block_bodies.first().expect("already checked if there are blocks").1.first_tx_num(); + block_bodies.first().expect("already checked if there are blocks").first_tx_num(); let to_transaction_num = - block_bodies.last().expect("already checked if there are blocks").1.last_tx_num(); + block_bodies.last().expect("already checked if there are blocks").last_tx_num(); let storage_range = BlockNumberAddress::range(range.clone()); @@ -2199,7 +2219,7 @@ impl StateWriter let mut receipts = Vec::with_capacity(block_bodies.len()); // loop break if we are at the end of the blocks. - for (_, block_body) in block_bodies { + for block_body in block_bodies { let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); for num in block_body.tx_num_range() { if receipts_iter.peek().is_some_and(|(n, _)| *n == num) { @@ -2920,8 +2940,7 @@ impl BlockWrite // First transaction to be removed let unwind_tx_from = self - .tx - .get::(block)? + .block_body_indices(block)? .map(|b| b.next_tx_num()) .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?; @@ -2957,8 +2976,7 @@ impl BlockWrite // First transaction to be removed let unwind_tx_from = self - .tx - .get::(block)? + .block_body_indices(block)? .map(|b| b.next_tx_num()) .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?; diff --git a/crates/storage/provider/src/providers/static_file/jar.rs b/crates/storage/provider/src/providers/static_file/jar.rs index 0ff9ed20a..4b6525c1d 100644 --- a/crates/storage/provider/src/providers/static_file/jar.rs +++ b/crates/storage/provider/src/providers/static_file/jar.rs @@ -25,7 +25,7 @@ use reth_storage_api::{BlockBodyIndicesProvider, OmmersProvider, WithdrawalsProv use reth_storage_errors::provider::{ProviderError, ProviderResult}; use std::{ fmt::Debug, - ops::{Deref, RangeBounds}, + ops::{Deref, RangeBounds, RangeInclusive}, sync::Arc, }; @@ -386,4 +386,19 @@ impl BlockBodyIndicesProvider for StaticFileJarProvider<'_, N fn block_body_indices(&self, num: u64) -> ProviderResult> { self.cursor()?.get_one::(num.into()) } + + fn block_body_indices_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + let mut cursor = self.cursor()?; + let mut indices = Vec::with_capacity((range.end() - range.start() + 1) as usize); + + for num in range { + if let Some(block) = cursor.get_one::(num.into())? { + indices.push(block) + } + } + Ok(indices) + } } diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index d60109849..11f768a07 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1729,6 +1729,14 @@ impl BlockBodyIndicesProvider for StaticFileProvider { } }) } + + fn block_body_indices_range( + &self, + _range: RangeInclusive, + ) -> ProviderResult> { + // Required data not present in static_files + Err(ProviderError::UnsupportedProvider) + } } impl StatsReader for StaticFileProvider { diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 113c2c509..e6efe8012 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -777,6 +777,12 @@ impl BlockBodyIndicesProvider for MockEthProvider { fn block_body_indices(&self, _num: u64) -> ProviderResult> { Ok(None) } + fn block_body_indices_range( + &self, + _range: RangeInclusive, + ) -> ProviderResult> { + Ok(vec![]) + } } impl ChangeSetReader for MockEthProvider { diff --git a/crates/storage/storage-api/src/block_indices.rs b/crates/storage/storage-api/src/block_indices.rs index 25c37bca8..3c6860fb2 100644 --- a/crates/storage/storage-api/src/block_indices.rs +++ b/crates/storage/storage-api/src/block_indices.rs @@ -1,5 +1,7 @@ +use alloy_primitives::BlockNumber; use reth_db_models::StoredBlockBodyIndices; use reth_storage_errors::provider::ProviderResult; +use std::ops::RangeInclusive; /// Client trait for fetching block body indices related data. #[auto_impl::auto_impl(&, Arc)] @@ -8,4 +10,10 @@ pub trait BlockBodyIndicesProvider: Send + Sync { /// /// Returns `None` if block is not found. fn block_body_indices(&self, num: u64) -> ProviderResult>; + + /// Returns the block body indices within the requested range matching number from storage. + fn block_body_indices_range( + &self, + range: RangeInclusive, + ) -> ProviderResult>; } diff --git a/crates/storage/storage-api/src/noop.rs b/crates/storage/storage-api/src/noop.rs index a88fbbdd9..20d975852 100644 --- a/crates/storage/storage-api/src/noop.rs +++ b/crates/storage/storage-api/src/noop.rs @@ -567,4 +567,11 @@ impl BlockBodyIndicesProvider for NoopProvider ProviderResult> { Ok(None) } + + fn block_body_indices_range( + &self, + _range: RangeInclusive, + ) -> ProviderResult> { + Ok(vec![]) + } }