diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 8e1924925..c778e0508 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -25,7 +25,7 @@ use reth_provider::{ BlockExecutionWriter, BlockNumReader, BlockWriter, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, ChainSpecProvider, ChainSplit, ChainSplitTarget, DBProvider, DisplayBlocksChain, HeaderProvider, ProviderError, - StaticFileProviderFactory, + StaticFileProviderFactory, StorageLocation, }; use reth_stages_api::{MetricEvent, MetricEventsSender}; use reth_storage_errors::provider::{ProviderResult, RootMismatch}; @@ -1333,7 +1333,7 @@ where info!(target: "blockchain_tree", "REORG: revert canonical from database by unwinding chain blocks {:?}", revert_range); // read block and execution result from database. and remove traces of block from tables. let blocks_and_execution = provider_rw - .take_block_and_execution_range(revert_range) + .take_block_and_execution_above(revert_until, StorageLocation::Database) .map_err(|e| CanonicalError::CanonicalRevert(e.to_string()))?; provider_rw.commit()?; diff --git a/crates/cli/commands/src/stage/unwind.rs b/crates/cli/commands/src/stage/unwind.rs index 4f47a70b0..2d29121d0 100644 --- a/crates/cli/commands/src/stage/unwind.rs +++ b/crates/cli/commands/src/stage/unwind.rs @@ -2,7 +2,7 @@ use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs}; use alloy_eips::BlockHashOrNumber; -use alloy_primitives::{BlockNumber, B256}; +use alloy_primitives::B256; use clap::{Parser, Subcommand}; use reth_beacon_consensus::EthBeaconConsensus; use reth_chainspec::{EthChainSpec, EthereumHardforks}; @@ -17,6 +17,7 @@ use reth_node_core::args::NetworkArgs; use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainSpecProvider, ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, + StorageLocation, }; use reth_prune::PruneModes; use reth_stages::{ @@ -25,7 +26,7 @@ use reth_stages::{ ExecutionStageThresholds, Pipeline, StageSet, }; use reth_static_file::StaticFileProducer; -use std::{ops::RangeInclusive, sync::Arc}; +use std::sync::Arc; use tokio::sync::watch; use tracing::info; @@ -52,16 +53,13 @@ impl> Command pub async fn execute>(self) -> eyre::Result<()> { let Environment { provider_factory, config, .. } = self.env.init::(AccessRights::RW)?; - let range = self.command.unwind_range(provider_factory.clone())?; - if *range.start() == 0 { - eyre::bail!("Cannot unwind genesis block") - } + let target = self.command.unwind_target(provider_factory.clone())?; let highest_static_file_block = provider_factory .static_file_provider() .get_highest_static_files() .max() - .filter(|highest_static_file_block| highest_static_file_block >= range.start()); + .filter(|highest_static_file_block| *highest_static_file_block > target); // Execute a pipeline unwind if the start of the range overlaps the existing static // files. If that's the case, then copy all available data from MDBX to static files, and @@ -75,9 +73,9 @@ impl> Command } if let Some(highest_static_file_block) = highest_static_file_block { - info!(target: "reth::cli", ?range, ?highest_static_file_block, "Executing a pipeline unwind."); + info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind."); } else { - info!(target: "reth::cli", ?range, "Executing a pipeline unwind."); + info!(target: "reth::cli", ?target, "Executing a pipeline unwind."); } // This will build an offline-only pipeline if the `offline` flag is enabled @@ -86,29 +84,25 @@ impl> Command // Move all applicable data from database to static files. pipeline.move_to_static_files()?; - pipeline.unwind((*range.start()).saturating_sub(1), None)?; + pipeline.unwind(target, None)?; } else { - info!(target: "reth::cli", ?range, "Executing a database unwind."); + info!(target: "reth::cli", ?target, "Executing a database unwind."); let provider = provider_factory.provider_rw()?; - let _ = provider - .take_block_and_execution_range(range.clone()) + provider + .remove_block_and_execution_above(target, StorageLocation::Both) .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?; // update finalized block if needed let last_saved_finalized_block_number = provider.last_finalized_block_number()?; - let range_min = - range.clone().min().ok_or(eyre::eyre!("Could not fetch lower range end"))?; - if last_saved_finalized_block_number.is_none() || - Some(range_min) < last_saved_finalized_block_number - { - provider.save_finalized_block_number(BlockNumber::from(range_min))?; + if last_saved_finalized_block_number.is_none_or(|f| f > target) { + provider.save_finalized_block_number(target)?; } provider.commit()?; } - info!(target: "reth::cli", range=?range.clone(), count=range.count(), "Unwound blocks"); + info!(target: "reth::cli", ?target, "Unwound blocks"); Ok(()) } @@ -183,13 +177,11 @@ enum Subcommands { } impl Subcommands { - /// Returns the block range to unwind. - /// - /// This returns an inclusive range: [target..=latest] - fn unwind_range>>( + /// Returns the block to unwind to. The returned block will stay in database. + fn unwind_target>>( &self, factory: ProviderFactory, - ) -> eyre::Result> { + ) -> eyre::Result { let provider = factory.provider()?; let last = provider.last_block_number()?; let target = match self { @@ -200,11 +192,11 @@ impl Subcommands { BlockHashOrNumber::Number(num) => *num, }, Self::NumBlocks { amount } => last.saturating_sub(*amount), - } + 1; + }; if target > last { eyre::bail!("Target block number is higher than the latest block number") } - Ok(target..=last) + Ok(target) } } diff --git a/crates/optimism/node/src/node.rs b/crates/optimism/node/src/node.rs index 46841a2a5..82b2ce2eb 100644 --- a/crates/optimism/node/src/node.rs +++ b/crates/optimism/node/src/node.rs @@ -62,6 +62,14 @@ impl> BlockBodyWriter for ) -> ProviderResult<()> { self.0.write_block_bodies(provider, bodies) } + + fn remove_block_bodies_above( + &self, + provider: &Provider, + block: alloy_primitives::BlockNumber, + ) -> ProviderResult<()> { + self.0.remove_block_bodies_above(provider, block) + } } impl ChainStorage for OpStorage { diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index b6eab349e..e541b9081 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -10,10 +10,7 @@ use tracing::*; use alloy_primitives::TxNumber; use reth_db::{tables, transaction::DbTx}; -use reth_db_api::{ - cursor::{DbCursorRO, DbCursorRW}, - transaction::DbTxMut, -}; +use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut}; use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; use reth_primitives::StaticFileSegment; use reth_provider::{ @@ -70,6 +67,82 @@ impl BodyStage { pub const fn new(downloader: D) -> Self { Self { downloader, buffer: None } } + + /// Ensures that static files and database are in sync. + fn ensure_consistency( + &self, + provider: &Provider, + unwind_block: Option, + ) -> Result<(), StageError> + where + Provider: DBProvider + BlockReader + StaticFileProviderFactory, + { + // Get id for the next tx_num of zero if there are no transactions. + let next_tx_num = provider + .tx_ref() + .cursor_read::()? + .last()? + .map(|(id, _)| id + 1) + .unwrap_or_default(); + + let static_file_provider = provider.static_file_provider(); + + // Make sure Transactions static file is at the same height. If it's further, this + // input execution was interrupted previously and we need to unwind the static file. + let next_static_file_tx_num = static_file_provider + .get_highest_static_file_tx(StaticFileSegment::Transactions) + .map(|id| id + 1) + .unwrap_or_default(); + + match next_static_file_tx_num.cmp(&next_tx_num) { + // If static files are ahead, we are currently unwinding the stage or we didn't reach + // the database commit in a previous stage run. So, our only solution is to unwind the + // static files and proceed from the database expected height. + Ordering::Greater => { + let highest_db_block = + provider.tx_ref().entries::()? as u64; + let mut static_file_producer = + static_file_provider.latest_writer(StaticFileSegment::Transactions)?; + static_file_producer + .prune_transactions(next_static_file_tx_num - next_tx_num, highest_db_block)?; + // Since this is a database <-> static file inconsistency, we commit the change + // straight away. + static_file_producer.commit()?; + } + // If static files are behind, then there was some corruption or loss of files. This + // error will trigger an unwind, that will bring the database to the same height as the + // static files. + Ordering::Less => { + // If we are already in the process of unwind, this might be fine because we will + // fix the inconsistency right away. + if let Some(unwind_to) = unwind_block { + let next_tx_num_after_unwind = provider + .tx_ref() + .get::(unwind_to)? + .map(|b| b.next_tx_num()) + .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?; + + // This means we need a deeper unwind. + if next_tx_num_after_unwind > next_static_file_tx_num { + return Err(missing_static_data_error( + next_static_file_tx_num.saturating_sub(1), + &static_file_provider, + provider, + )?) + } + } else { + return Err(missing_static_data_error( + next_static_file_tx_num.saturating_sub(1), + &static_file_provider, + provider, + )?) + } + } + Ordering::Equal => {} + } + + Ok(()) + } } impl Stage for BodyStage @@ -122,50 +195,9 @@ where } let (from_block, to_block) = input.next_block_range().into_inner(); - // Get id for the next tx_num of zero if there are no transactions. - let next_tx_num = provider - .tx_ref() - .cursor_read::()? - .last()? - .map(|(id, _)| id + 1) - .unwrap_or_default(); + self.ensure_consistency(provider, None)?; - let static_file_provider = provider.static_file_provider(); - - // Make sure Transactions static file is at the same height. If it's further, this - // input execution was interrupted previously and we need to unwind the static file. - let next_static_file_tx_num = static_file_provider - .get_highest_static_file_tx(StaticFileSegment::Transactions) - .map(|id| id + 1) - .unwrap_or_default(); - - match next_static_file_tx_num.cmp(&next_tx_num) { - // If static files are ahead, then we didn't reach the database commit in a previous - // stage run. So, our only solution is to unwind the static files and proceed from the - // database expected height. - Ordering::Greater => { - let mut static_file_producer = - static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?; - static_file_producer - .prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?; - // Since this is a database <-> static file inconsistency, we commit the change - // straight away. - static_file_producer.commit()?; - } - // If static files are behind, then there was some corruption or loss of files. This - // error will trigger an unwind, that will bring the database to the same height as the - // static files. - Ordering::Less => { - return Err(missing_static_data_error( - next_static_file_tx_num.saturating_sub(1), - &static_file_provider, - provider, - )?) - } - Ordering::Equal => {} - } - - debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, start_tx_id = next_tx_num, "Commencing sync"); + debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, "Commencing sync"); let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?; trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks"); @@ -200,66 +232,8 @@ where ) -> Result { self.buffer.take(); - let static_file_provider = provider.static_file_provider(); - let tx = provider.tx_ref(); - // Cursors to unwind bodies, ommers - let mut body_cursor = tx.cursor_write::()?; - let mut ommers_cursor = tx.cursor_write::()?; - let mut withdrawals_cursor = tx.cursor_write::()?; - // Cursors to unwind transitions - let mut tx_block_cursor = tx.cursor_write::()?; - - let mut rev_walker = body_cursor.walk_back(None)?; - while let Some((number, block_meta)) = rev_walker.next().transpose()? { - if number <= input.unwind_to { - break - } - - // Delete the ommers entry if any - if ommers_cursor.seek_exact(number)?.is_some() { - ommers_cursor.delete_current()?; - } - - // Delete the withdrawals entry if any - if withdrawals_cursor.seek_exact(number)?.is_some() { - withdrawals_cursor.delete_current()?; - } - - // Delete all transaction to block values. - if !block_meta.is_empty() && - tx_block_cursor.seek_exact(block_meta.last_tx_num())?.is_some() - { - tx_block_cursor.delete_current()?; - } - - // Delete the current body value - rev_walker.delete_current()?; - } - - let mut static_file_producer = - static_file_provider.latest_writer(StaticFileSegment::Transactions)?; - - // Unwind from static files. Get the current last expected transaction from DB, and match it - // on static file - let db_tx_num = - body_cursor.last()?.map(|(_, block_meta)| block_meta.last_tx_num()).unwrap_or_default(); - let static_file_tx_num: u64 = static_file_provider - .get_highest_static_file_tx(StaticFileSegment::Transactions) - .unwrap_or_default(); - - // If there are more transactions on database, then we are missing static file data and we - // need to unwind further. - if db_tx_num > static_file_tx_num { - return Err(missing_static_data_error( - static_file_tx_num, - &static_file_provider, - provider, - )?) - } - - // Unwinds static file - static_file_producer - .prune_transactions(static_file_tx_num.saturating_sub(db_tx_num), input.unwind_to)?; + self.ensure_consistency(provider, Some(input.unwind_to))?; + provider.remove_bodies_above(input.unwind_to, StorageLocation::Both)?; Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) @@ -268,6 +242,8 @@ where } } +/// Called when database is ahead of static files. Attempts to find the first block we are missing +/// transactions for. fn missing_static_data_error( last_tx_num: TxNumber, static_file_provider: &StaticFileProvider, diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index cc50aa351..354eb10c1 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -781,21 +781,6 @@ mod tests { let db_senders = provider.senders_by_tx_range(range); assert_eq!(db_senders, Ok(vec![])); - - let result = provider.take_block_transaction_range(0..=0); - assert_eq!( - result, - Ok(vec![( - 0, - block - .body - .transactions - .iter() - .cloned() - .map(|tx| tx.into_ecrecovered().unwrap()) - .collect() - )]) - ) } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 92cc8df2f..8c390b06c 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -26,7 +26,7 @@ use alloy_eips::{ BlockHashOrNumber, }; use alloy_primitives::{keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256}; -use itertools::{izip, Itertools}; +use itertools::Itertools; use rayon::slice::ParallelSliceMut; use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks}; use reth_db::{ @@ -41,7 +41,7 @@ use reth_db_api::{ }, table::Table, transaction::{DbTx, DbTxMut}, - DatabaseError, DbTxUnwindExt, + DatabaseError, }; use reth_evm::ConfigureEvmEnv; use reth_execution_types::{Chain, ExecutionOutcome}; @@ -50,7 +50,7 @@ use reth_node_types::NodeTypes; use reth_primitives::{ Account, Block, BlockBody, BlockWithSenders, Bytecode, GotExpected, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry, TransactionMeta, - TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, + TransactionSigned, TransactionSignedNoHash, }; use reth_primitives_traits::{BlockBody as _, FullNodePrimitives, SignedTransaction}; use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; @@ -75,7 +75,7 @@ use std::{ sync::{mpsc, Arc}, }; use tokio::sync::watch; -use tracing::{debug, error, trace}; +use tracing::{debug, trace}; /// A [`DatabaseProvider`] that holds a read-only database transaction. pub type DatabaseProviderRO = DatabaseProvider<::TX, N>; @@ -881,276 +881,6 @@ impl DatabaseProvider { Ok(self.tx.commit()?) } - /// Remove requested block transactions, without returning them. - /// - /// This will remove block data for the given range from the following tables: - /// * [`BlockBodyIndices`](tables::BlockBodyIndices) - /// * [`Transactions`](tables::Transactions) - /// * [`TransactionSenders`](tables::TransactionSenders) - /// * [`TransactionHashNumbers`](tables::TransactionHashNumbers) - /// * [`TransactionBlocks`](tables::TransactionBlocks) - pub fn remove_block_transaction_range( - &self, - range: impl RangeBounds + Clone, - ) -> ProviderResult<()> { - // Raad range of block bodies to get all transactions id's of this range. - let block_bodies = self.take::(range)?; - - if block_bodies.is_empty() { - return Ok(()) - } - - // Compute the first and last tx ID in the range - let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num(); - let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num(); - - // If this is the case then all of the blocks in the range are empty - if last_transaction < first_transaction { - return Ok(()) - } - - // Get transactions so we can then remove - let transactions = self - .take::(first_transaction..=last_transaction)? - .into_iter() - .map(|(id, tx)| (id, tx.into())) - .collect::>(); - - // remove senders - self.remove::(first_transaction..=last_transaction)?; - - // Remove TransactionHashNumbers - let mut tx_hash_cursor = self.tx.cursor_write::()?; - for (_, tx) in &transactions { - if tx_hash_cursor.seek_exact(tx.hash())?.is_some() { - tx_hash_cursor.delete_current()?; - } - } - - // Remove TransactionBlocks index if there are transaction present - if !transactions.is_empty() { - let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0; - self.remove::(tx_id_range)?; - } - - Ok(()) - } - - /// Get requested blocks transaction with senders, also removing them from the database - /// - /// This will remove block data for the given range from the following tables: - /// * [`BlockBodyIndices`](tables::BlockBodyIndices) - /// * [`Transactions`](tables::Transactions) - /// * [`TransactionSenders`](tables::TransactionSenders) - /// * [`TransactionHashNumbers`](tables::TransactionHashNumbers) - /// * [`TransactionBlocks`](tables::TransactionBlocks) - pub fn take_block_transaction_range( - &self, - range: impl RangeBounds + Clone, - ) -> ProviderResult)>> { - // Raad range of block bodies to get all transactions id's of this range. - let block_bodies = self.get::(range)?; - - if block_bodies.is_empty() { - return Ok(Vec::new()) - } - - // Compute the first and last tx ID in the range - let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num(); - let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num(); - - // If this is the case then all of the blocks in the range are empty - if last_transaction < first_transaction { - return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect()) - } - - // Get transactions and senders - let transactions = self - .take::(first_transaction..=last_transaction)? - .into_iter() - .map(|(id, tx)| (id, tx.into())) - .collect::>(); - - let mut senders = - self.take::(first_transaction..=last_transaction)?; - - recover_block_senders(&mut senders, &transactions, first_transaction, last_transaction)?; - - // Remove TransactionHashNumbers - let mut tx_hash_cursor = self.tx.cursor_write::()?; - for (_, tx) in &transactions { - if tx_hash_cursor.seek_exact(tx.hash())?.is_some() { - tx_hash_cursor.delete_current()?; - } - } - - // Remove TransactionBlocks index if there are transaction present - if !transactions.is_empty() { - let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0; - self.remove::(tx_id_range)?; - } - - // Merge transaction into blocks - let mut block_tx = Vec::with_capacity(block_bodies.len()); - let mut senders = senders.into_iter(); - let mut transactions = transactions.into_iter(); - for (block_number, block_body) in block_bodies { - let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize); - for _ in block_body.tx_num_range() { - let tx = transactions.next(); - let sender = senders.next(); - - let recovered = match (tx, sender) { - (Some((tx_id, tx)), Some((sender_tx_id, sender))) => { - if tx_id == sender_tx_id { - Ok(TransactionSignedEcRecovered::from_signed_transaction(tx, sender)) - } else { - Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) - } - } - (Some((tx_id, _)), _) | (_, Some((tx_id, _))) => { - Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id }) - } - (None, None) => Err(ProviderError::BlockBodyTransactionCount), - }?; - one_block_tx.push(recovered) - } - block_tx.push((block_number, one_block_tx)); - } - - Ok(block_tx) - } - - /// Remove the given range of blocks, without returning any of the blocks. - /// - /// This will remove block data for the given range from the following tables: - /// * [`HeaderNumbers`](tables::HeaderNumbers) - /// * [`CanonicalHeaders`](tables::CanonicalHeaders) - /// * [`BlockOmmers`](tables::BlockOmmers) - /// * [`BlockWithdrawals`](tables::BlockWithdrawals) - /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) - /// - /// This will also remove transaction data according to - /// [`remove_block_transaction_range`](Self::remove_block_transaction_range). - pub fn remove_block_range( - &self, - range: impl RangeBounds + Clone, - ) -> ProviderResult<()> { - let block_headers = self.remove::(range.clone())?; - if block_headers == 0 { - return Ok(()) - } - - self.tx.unwind_table_by_walker::( - range.clone(), - )?; - self.remove::(range.clone())?; - self.remove::(range.clone())?; - self.remove::(range.clone())?; - self.remove_block_transaction_range(range.clone())?; - self.remove::(range)?; - - Ok(()) - } - - /// Remove the given range of blocks, and return them. - /// - /// This will remove block data for the given range from the following tables: - /// * [`HeaderNumbers`](tables::HeaderNumbers) - /// * [`CanonicalHeaders`](tables::CanonicalHeaders) - /// * [`BlockOmmers`](tables::BlockOmmers) - /// * [`BlockWithdrawals`](tables::BlockWithdrawals) - /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) - /// - /// This will also remove transaction data according to - /// [`take_block_transaction_range`](Self::take_block_transaction_range). - pub fn take_block_range( - &self, - range: impl RangeBounds + Clone, - ) -> ProviderResult> - where - N::ChainSpec: EthereumHardforks, - { - // For blocks we need: - // - // - Headers - // - Bodies (transactions) - // - Uncles/ommers - // - Withdrawals - // - Signers - - let block_headers = self.take::(range.clone())?; - if block_headers.is_empty() { - return Ok(Vec::new()) - } - - self.tx.unwind_table_by_walker::( - range.clone(), - )?; - let block_header_hashes = self.take::(range.clone())?; - let block_ommers = self.take::(range.clone())?; - let block_withdrawals = self.take::(range.clone())?; - let block_tx = self.take_block_transaction_range(range.clone())?; - - let mut blocks = Vec::with_capacity(block_headers.len()); - - // rm HeaderTerminalDifficulties - self.remove::(range)?; - - // merge all into block - let block_header_iter = block_headers.into_iter(); - let block_header_hashes_iter = block_header_hashes.into_iter(); - let block_tx_iter = block_tx.into_iter(); - - // Ommers can be empty for some blocks - let mut block_ommers_iter = block_ommers.into_iter(); - let mut block_withdrawals_iter = block_withdrawals.into_iter(); - let mut block_ommers = block_ommers_iter.next(); - let mut block_withdrawals = block_withdrawals_iter.next(); - - for ((main_block_number, header), (_, header_hash), (_, tx)) in - izip!(block_header_iter, block_header_hashes_iter, block_tx_iter) - { - let header = SealedHeader::new(header, header_hash); - - let (transactions, senders) = tx.into_iter().map(|tx| tx.to_components()).unzip(); - - // Ommers can be missing - let mut ommers = Vec::new(); - if let Some((block_number, _)) = block_ommers.as_ref() { - if *block_number == main_block_number { - ommers = block_ommers.take().unwrap().1.ommers; - block_ommers = block_ommers_iter.next(); - } - }; - - // withdrawal can be missing - let shanghai_is_active = - self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp); - let mut withdrawals = Some(Withdrawals::default()); - if shanghai_is_active { - if let Some((block_number, _)) = block_withdrawals.as_ref() { - if *block_number == main_block_number { - withdrawals = Some(block_withdrawals.take().unwrap().1.withdrawals); - block_withdrawals = block_withdrawals_iter.next(); - } - } - } else { - withdrawals = None - } - - blocks.push(SealedBlockWithSenders { - block: SealedBlock { - header, - body: BlockBody { transactions, ommers, withdrawals }, - }, - senders, - }) - } - - Ok(blocks) - } - /// Load shard and remove it. If list is empty, last shard was full or /// there are no shards at all. fn take_shard(&self, key: T::Key) -> ProviderResult> @@ -2998,52 +2728,48 @@ impl StateReader for DatabaseProvider { impl BlockExecutionWriter for DatabaseProvider { - fn take_block_and_execution_range( + fn take_block_and_execution_above( &self, - range: RangeInclusive, + block: BlockNumber, + remove_transactions_from: StorageLocation, ) -> ProviderResult { - self.unwind_trie_state_range(range.clone())?; + let range = block + 1..=self.last_block_number()?; - // get blocks - let blocks = self.take_block_range(range.clone())?; - let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1)); + self.unwind_trie_state_range(range.clone())?; // get execution res let execution_state = self.take_state(range.clone())?; + let blocks = self.sealed_block_with_senders_range(range)?; + // remove block bodies it is needed for both get block range and get block execution results // that is why it is deleted afterwards. - self.remove::(range)?; + self.remove_blocks_above(block, remove_transactions_from)?; // Update pipeline progress - if let Some(fork_number) = unwind_to { - self.update_pipeline_stages(fork_number, true)?; - } + self.update_pipeline_stages(block, true)?; Ok(Chain::new(blocks, execution_state, None)) } - fn remove_block_and_execution_range( + fn remove_block_and_execution_above( &self, - range: RangeInclusive, + block: BlockNumber, + remove_transactions_from: StorageLocation, ) -> ProviderResult<()> { + let range = block + 1..=self.last_block_number()?; + self.unwind_trie_state_range(range.clone())?; - // get blocks - let blocks = self.take_block_range(range.clone())?; - let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1)); - // remove execution res - self.remove_state(range.clone())?; + self.remove_state(range)?; // remove block bodies it is needed for both get block range and get block execution results // that is why it is deleted afterwards. - self.remove::(range)?; + self.remove_blocks_above(block, remove_transactions_from)?; // Update pipeline progress - if let Some(block_number) = unwind_to { - self.update_pipeline_stages(block_number, true)?; - } + self.update_pipeline_stages(block, true)?; Ok(()) } @@ -3230,6 +2956,92 @@ impl BlockWriter Ok(()) } + fn remove_blocks_above( + &self, + block: BlockNumber, + remove_transactions_from: StorageLocation, + ) -> ProviderResult<()> { + let mut canonical_headers_cursor = self.tx.cursor_write::()?; + let mut rev_headers = canonical_headers_cursor.walk_back(None)?; + + while let Some(Ok((number, hash))) = rev_headers.next() { + if number <= block { + break + } + self.tx.delete::(hash, None)?; + rev_headers.delete_current()?; + } + self.remove::(block + 1..)?; + self.remove::(block + 1..)?; + + // First transaction to be removed + let unwind_tx_from = self + .tx + .get::(block)? + .map(|b| b.next_tx_num()) + .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?; + + // Last transaction to be removed + let unwind_tx_to = self + .tx + .cursor_read::()? + .last()? + // shouldn't happen because this was OK above + .ok_or(ProviderError::BlockBodyIndicesNotFound(block))? + .1 + .last_tx_num(); + + if unwind_tx_from < unwind_tx_to { + for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? { + self.tx.delete::(hash, None)?; + } + } + + self.remove::(unwind_tx_from..)?; + + self.remove_bodies_above(block, remove_transactions_from)?; + + Ok(()) + } + + fn remove_bodies_above( + &self, + block: BlockNumber, + remove_transactions_from: StorageLocation, + ) -> ProviderResult<()> { + self.storage.writer().remove_block_bodies_above(self, block)?; + + // First transaction to be removed + let unwind_tx_from = self + .tx + .get::(block)? + .map(|b| b.next_tx_num()) + .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?; + + self.remove::(block + 1..)?; + self.remove::(unwind_tx_from..)?; + + if remove_transactions_from.database() { + self.remove::(unwind_tx_from..)?; + } + + if remove_transactions_from.static_files() { + let static_file_tx_num = self + .static_file_provider + .get_highest_static_file_tx(StaticFileSegment::Transactions); + + if let Some(static_tx) = static_file_tx_num { + if static_tx >= unwind_tx_from { + self.static_file_provider + .latest_writer(StaticFileSegment::Transactions)? + .prune_transactions(static_tx - unwind_tx_from + 1, block)?; + } + } + } + + Ok(()) + } + /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually fn append_blocks_with_state( &self, @@ -3381,79 +3193,3 @@ impl DBProvider for DatabaseProvider self.prune_modes_ref() } } - -/// Helper method to recover senders for any blocks in the db which do not have senders. This -/// compares the length of the input senders [`Vec`], with the length of given transactions [`Vec`], -/// and will add to the input senders vec if there are more transactions. -/// -/// NOTE: This will modify the input senders list, which is why a mutable reference is required. -fn recover_block_senders( - senders: &mut Vec<(u64, Address)>, - transactions: &[(u64, TransactionSigned)], - first_transaction: u64, - last_transaction: u64, -) -> ProviderResult<()> { - // Recover senders manually if not found in db - // NOTE: Transactions are always guaranteed to be in the database whereas - // senders might be pruned. - if senders.len() != transactions.len() { - if senders.len() > transactions.len() { - error!(target: "providers::db", senders=%senders.len(), transactions=%transactions.len(), - first_tx=%first_transaction, last_tx=%last_transaction, - "unexpected senders and transactions mismatch"); - } - let missing = transactions.len().saturating_sub(senders.len()); - senders.reserve(missing); - // Find all missing senders, their corresponding tx numbers and indexes to the original - // `senders` vector at which the recovered senders will be inserted. - let mut missing_senders = Vec::with_capacity(missing); - { - let mut senders = senders.iter().peekable(); - - // `transactions` contain all entries. `senders` contain _some_ of the senders for - // these transactions. Both are sorted and indexed by `TxNumber`. - // - // The general idea is to iterate on both `transactions` and `senders`, and advance - // the `senders` iteration only if it matches the current `transactions` entry's - // `TxNumber`. Otherwise, add the transaction to the list of missing senders. - for (i, (tx_number, transaction)) in transactions.iter().enumerate() { - if let Some((sender_tx_number, _)) = senders.peek() { - if sender_tx_number == tx_number { - // If current sender's `TxNumber` matches current transaction's - // `TxNumber`, advance the senders iterator. - senders.next(); - } else { - // If current sender's `TxNumber` doesn't match current transaction's - // `TxNumber`, add it to missing senders. - missing_senders.push((i, tx_number, transaction)); - } - } else { - // If there's no more senders left, but we're still iterating over - // transactions, add them to missing senders - missing_senders.push((i, tx_number, transaction)); - } - } - } - - // Recover senders - let recovered_senders = TransactionSigned::recover_signers( - missing_senders.iter().map(|(_, _, tx)| *tx).collect::>(), - missing_senders.len(), - ) - .ok_or(ProviderError::SenderRecoveryError)?; - - // Insert recovered senders along with tx numbers at the corresponding indexes to the - // original `senders` vector - for ((i, tx_number, _), sender) in missing_senders.into_iter().zip(recovered_senders) { - // Insert will put recovered senders at necessary positions and shift the rest - senders.insert(i, (*tx_number, sender)); - } - - // Debug assertions which are triggered during the test to ensure that all senders are - // present and sorted - debug_assert_eq!(senders.len(), transactions.len(), "missing one or more senders"); - debug_assert!(senders.iter().tuple_windows().all(|(a, b)| a.0 < b.0), "senders not sorted"); - } - - Ok(()) -} diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index c84534e7a..c2ce47705 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -5,7 +5,6 @@ use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::SealedBlockWithSenders; use reth_storage_errors::provider::ProviderResult; use reth_trie::{updates::TrieUpdates, HashedPostStateSorted}; -use std::ops::RangeInclusive; /// An enum that represents the storage location for a piece of data. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -33,16 +32,22 @@ impl StorageLocation { /// BlockExecution Writer #[auto_impl::auto_impl(&, Arc, Box)] pub trait BlockExecutionWriter: BlockWriter + Send + Sync { - /// Take range of blocks and its execution result - fn take_block_and_execution_range( + /// Take all of the blocks above the provided number and their execution result + /// + /// The passed block number will stay in the database. + fn take_block_and_execution_above( &self, - range: RangeInclusive, + block: BlockNumber, + remove_transactions_from: StorageLocation, ) -> ProviderResult; - /// Remove range of blocks and its execution result - fn remove_block_and_execution_range( + /// Remove all of the blocks above the provided number and their execution result + /// + /// The passed block number will stay in the database. + fn remove_block_and_execution_above( &self, - range: RangeInclusive, + block: BlockNumber, + remove_transactions_from: StorageLocation, ) -> ProviderResult<()>; } @@ -81,6 +86,22 @@ pub trait BlockWriter: Send + Sync { write_transactions_to: StorageLocation, ) -> ProviderResult<()>; + /// Removes all blocks above the given block number from the database. + /// + /// Note: This does not remove state or execution data. + fn remove_blocks_above( + &self, + block: BlockNumber, + remove_transactions_from: StorageLocation, + ) -> ProviderResult<()>; + + /// Removes all block bodies above the given block number from the database. + fn remove_bodies_above( + &self, + block: BlockNumber, + remove_transactions_from: StorageLocation, + ) -> ProviderResult<()>; + /// Appends a batch of sealed blocks to the blockchain, including sender information, and /// updates the post-state. /// diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 3878cf2a9..30c5f0d52 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -273,9 +273,7 @@ where // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number"); - self.database().remove_block_and_execution_range( - block_number + 1..=self.database().last_block_number()?, - )?; + self.database().remove_block_and_execution_above(block_number, StorageLocation::Both)?; // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure // we remove only what is ABOVE the block. @@ -287,10 +285,6 @@ where .get_writer(block_number, StaticFileSegment::Headers)? .prune_headers(highest_static_file_block.saturating_sub(block_number))?; - self.static_file() - .get_writer(block_number, StaticFileSegment::Transactions)? - .prune_transactions(total_txs, block_number)?; - if !self.database().prune_modes_ref().has_receipts_pruning() { self.static_file() .get_writer(block_number, StaticFileSegment::Receipts)? diff --git a/crates/storage/storage-api/src/chain.rs b/crates/storage/storage-api/src/chain.rs index 099f61f1b..d5228bddd 100644 --- a/crates/storage/storage-api/src/chain.rs +++ b/crates/storage/storage-api/src/chain.rs @@ -5,6 +5,7 @@ use reth_db::{ models::{StoredBlockOmmers, StoredBlockWithdrawals}, tables, transaction::DbTxMut, + DbTxUnwindExt, }; use reth_primitives_traits::{Block, BlockBody, FullNodePrimitives}; use reth_storage_errors::provider::ProviderResult; @@ -21,6 +22,13 @@ pub trait BlockBodyWriter { provider: &Provider, bodies: Vec<(BlockNumber, Option)>, ) -> ProviderResult<()>; + + /// Removes all block bodies above the given block number from the database. + fn remove_block_bodies_above( + &self, + provider: &Provider, + block: BlockNumber, + ) -> ProviderResult<()>; } /// Trait that implements how chain-specific types are written to the storage. @@ -69,4 +77,15 @@ where Ok(()) } + + fn remove_block_bodies_above( + &self, + provider: &Provider, + block: BlockNumber, + ) -> ProviderResult<()> { + provider.tx_ref().unwind_table_by_num::(block)?; + provider.tx_ref().unwind_table_by_num::(block)?; + + Ok(()) + } }