diff --git a/crates/interfaces/src/db/codecs/compact.rs b/crates/interfaces/src/db/codecs/compact.rs index ec77dd185..49b8ed076 100644 --- a/crates/interfaces/src/db/codecs/compact.rs +++ b/crates/interfaces/src/db/codecs/compact.rs @@ -1,5 +1,5 @@ use crate::db::{ - models::{accounts::AccountBeforeTx, StoredBlockBody}, + models::{accounts::AccountBeforeTx, StoredBlockOmmers}, Compress, Decompress, Error, }; use reth_codecs::{main_codec, Compact}; @@ -32,7 +32,15 @@ macro_rules! impl_compression_for_compact { }; } -impl_compression_for_compact!(Header, Account, Log, Receipt, TxType, StorageEntry, StoredBlockBody); +impl_compression_for_compact!( + Header, + Account, + Log, + Receipt, + TxType, + StorageEntry, + StoredBlockOmmers +); impl_compression_for_compact!(AccountBeforeTx, TransactionSigned); impl_compression_for_compact!(CompactU256); diff --git a/crates/interfaces/src/db/models/blocks.rs b/crates/interfaces/src/db/models/blocks.rs index c4da3f4be..a57d4eb32 100644 --- a/crates/interfaces/src/db/models/blocks.rs +++ b/crates/interfaces/src/db/models/blocks.rs @@ -9,37 +9,25 @@ use crate::{ }; use bytes::Bytes; use reth_codecs::{main_codec, Compact}; -use reth_primitives::{BlockHash, BlockNumber, Header, TxNumber, H256}; +use reth_primitives::{BlockHash, BlockNumber, Header, H256}; use serde::{Deserialize, Serialize}; -/// Total chain number of transactions. Key for [`CumulativeTxCount`]. +/// Total chain number of transactions. Value for [`CumulativeTxCount`]. +/// +/// Used for collecting transactions for a block. pub type NumTransactions = u64; -/// The storage representation of a block body. +/// The storage representation of a block ommers. /// -/// A block body is stored as a pointer to the first transaction in the block (`base_tx_id`), a -/// count of how many transactions are in the block, and the headers of the block's uncles. -/// -/// The [TxNumber]s for all the transactions in the block are `base_tx_id..(base_tx_id + +/// It is stored as the headers of the block's uncles. /// tx_amount)`. #[derive(Debug, Default, Eq, PartialEq, Clone)] #[main_codec] -pub struct StoredBlockBody { - /// The ID of the first transaction in the block. - pub base_tx_id: TxNumber, - /// The number of transactions in the block. - pub tx_amount: u64, +pub struct StoredBlockOmmers { /// The block headers of this block's uncles. pub ommers: Vec
, } -impl StoredBlockBody { - /// Return next block tx id. - pub fn next_block_tx_id(&self) -> TxNumber { - self.base_tx_id + self.tx_amount - } -} - /// Hash of the block header. Value for [`CanonicalHeaders`] pub type HeaderHash = H256; diff --git a/crates/interfaces/src/db/tables.rs b/crates/interfaces/src/db/tables.rs index d24fcfbd8..f91327623 100644 --- a/crates/interfaces/src/db/tables.rs +++ b/crates/interfaces/src/db/tables.rs @@ -4,7 +4,7 @@ use crate::db::{ codecs::CompactU256, models::{ accounts::{AccountBeforeTx, TxNumberAddress}, - blocks::{BlockNumHash, HeaderHash, NumTransactions, StoredBlockBody}, + blocks::{BlockNumHash, HeaderHash, NumTransactions, StoredBlockOmmers}, ShardedKey, }, DupSort, @@ -29,7 +29,7 @@ pub const TABLES: [(TableType, &str); 20] = [ (TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderNumbers::const_name()), (TableType::Table, Headers::const_name()), - (TableType::Table, BlockBodies::const_name()), + (TableType::Table, BlockOmmers::const_name()), (TableType::Table, CumulativeTxCount::const_name()), (TableType::Table, NonCanonicalTransactions::const_name()), (TableType::Table, Transactions::const_name()), @@ -122,15 +122,16 @@ table!( ); table!( - /// Stores a pointer to the first transaction in the block, the number of transactions in the - /// block, and the uncles/ommers of the block. - /// - /// The transaction IDs point to the [`Transactions`] table. - ( BlockBodies ) BlockNumHash | StoredBlockBody + /// Stores the uncles/ommers of the block. + ( BlockOmmers ) BlockNumHash | StoredBlockOmmers ); table!( /// Stores the maximum [`TxNumber`] from which this particular block starts. + /// + /// Used to collect transactions for the block. e.g. To collect transactions + /// for block `x` you would need to look at cumulative count at block `x` and + /// at block `x - 1`. ( CumulativeTxCount ) BlockNumHash | NumTransactions ); // TODO U256? diff --git a/crates/interfaces/src/provider/block.rs b/crates/interfaces/src/provider/block.rs index 853eb4ecd..83a399c8e 100644 --- a/crates/interfaces/src/provider/block.rs +++ b/crates/interfaces/src/provider/block.rs @@ -1,6 +1,6 @@ use crate::{ db::{ - models::{BlockNumHash, StoredBlockBody}, + models::{BlockNumHash, StoredBlockOmmers}, tables, DbTx, DbTxMut, }, provider::Error as ProviderError, @@ -142,14 +142,10 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( let start_tx_number = if block.number == 0 { 0 } else { get_cumulative_tx_count_by_hash(tx, block.parent_hash)? }; - // insert body - tx.put::( + // insert body ommers data + tx.put::( block_num_hash, - StoredBlockBody { - base_tx_id: start_tx_number, - tx_amount: block.body.len() as u64, - ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect(), - }, + StoredBlockOmmers { ommers: block.ommers.iter().map(|h| h.as_ref().clone()).collect() }, )?; let mut tx_number = start_tx_number; diff --git a/crates/stages/src/db.rs b/crates/stages/src/db.rs index e348ab731..9f2fa4897 100644 --- a/crates/stages/src/db.rs +++ b/crates/stages/src/db.rs @@ -3,12 +3,11 @@ use std::{ ops::{Deref, DerefMut}, }; -use reth_db::kv::cursor::PairResult; use reth_interfaces::db::{ models::{BlockNumHash, NumTransactions}, tables, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx, DbTxMut, Error, Table, }; -use reth_primitives::{BlockHash, BlockNumber}; +use reth_primitives::{BlockHash, BlockNumber, TxNumber}; use crate::{DatabaseIntegrityError, StageError}; @@ -90,12 +89,6 @@ where self.tx.take(); } - /// Get exact or previous value from the database - pub(crate) fn get_exact_or_prev(&self, key: T::Key) -> PairResult { - let mut cursor = self.cursor::()?; - Ok(cursor.seek_exact(key)?.or(cursor.prev()?)) - } - /// Query [tables::CanonicalHeaders] table for block hash by block number pub(crate) fn get_block_hash(&self, number: BlockNumber) -> Result { let hash = self @@ -121,6 +114,70 @@ where Ok(count) } + /// Get id for the first **potential** transaction in a block by looking up + /// the cumulative transaction count at the previous block. + /// + /// This function does not care whether the block is empty. + pub(crate) fn get_first_tx_id(&self, block: BlockNumber) -> Result { + // Handle genesis block + if block == 0 { + return Ok(0) + } + + let prev_key = self.get_block_numhash(block - 1)?; + self.get_tx_count(prev_key) + } + + /// Get id of the last transaction in the block. + /// Returns [None] if the block is empty. + /// + /// The blocks must exist in the database. + #[allow(dead_code)] + pub(crate) fn get_last_tx_id( + &self, + block: BlockNumber, + ) -> Result, StageError> { + let key = self.get_block_numhash(block)?; + + let mut cursor = self.cursor::()?; + let (_, tx_count) = + cursor.seek_exact(key)?.ok_or(DatabaseIntegrityError::CumulativeTxCount { + number: key.number(), + hash: key.hash(), + })?; + + let is_empty = { + if block != 0 { + let (_, prev_tx_count) = + cursor.prev()?.ok_or(DatabaseIntegrityError::CumulativeTxCount { + number: key.number() + 1, + hash: self.get_block_hash(key.number() + 1)?, + })?; + tx_count != prev_tx_count + } else { + tx_count == 0 + } + }; + + Ok(if !is_empty { Some(tx_count - 1) } else { None }) + } + + /// Get id of the latest transaction observed before a given block (inclusive). + /// Returns error if there are no transactions in the database. + pub(crate) fn get_latest_tx_id( + &self, + up_to_block: BlockNumber, + ) -> Result { + let key = self.get_block_numhash(up_to_block)?; + let tx_count = self.get_tx_count(key)?; + if tx_count != 0 { + Ok(tx_count - 1) + } else { + // No transactions in the database + Err(DatabaseIntegrityError::Transaction { id: 0 }.into()) + } + } + /// Unwind table by some number key #[inline] pub(crate) fn unwind_table_by_num(&self, num: u64) -> Result<(), Error> diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 6ae8f3c2b..550ff4845 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -94,6 +94,12 @@ pub enum DatabaseIntegrityError { /// The block number key number: BlockNumber, }, + /// The transaction is missing + #[error("Transaction #{id} not found")] + Transaction { + /// The transaction id + id: TxNumber, + }, } /// A pipeline execution error. diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index e9a4a8541..20336166f 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -6,7 +6,7 @@ use futures_util::TryStreamExt; use reth_interfaces::{ consensus::Consensus, db::{ - models::StoredBlockBody, tables, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx, + models::StoredBlockOmmers, tables, Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbTx, DbTxMut, }, p2p::bodies::downloader::BodyDownloader, @@ -38,16 +38,17 @@ const BODIES: StageId = StageId("Bodies"); /// /// The bodies are processed and data is inserted into these tables: /// -/// - [`BlockBodies`][reth_interfaces::db::tables::BlockBodies] +/// - [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers] /// - [`Transactions`][reth_interfaces::db::tables::Transactions] /// /// # Genesis /// /// This stage expects that the genesis has been inserted into the appropriate tables: /// -/// - The header tables (see [HeadersStage][crate::stages::headers::HeadersStage]) -/// - The various indexes (e.g. [TotalTxIndex][crate::stages::tx_index::TxIndex]) -/// - The [`BlockBodies`][reth_interfaces::db::tables::BlockBodies] table +/// - The header tables (see [`HeaderStage`][crate::stages::headers::HeaderStage]) +/// - The [`BlockOmmers`][reth_interfaces::db::tables::BlockOmmers] table +/// - The [`CumulativeTxCount`][reth_interfaces::db::tables::CumulativeTxCount] table +/// - The [`Transactions`][reth_interfaces::db::tables::Transactions] table #[derive(Debug)] pub struct BodyStage { /// The body downloader. @@ -96,12 +97,12 @@ impl Stage for BodyStage(db, starting_block, target)?; // Cursors used to write bodies and transactions - let mut bodies_cursor = db.cursor_mut::()?; + let mut ommers_cursor = db.cursor_mut::()?; let mut tx_cursor = db.cursor_mut::()?; - let mut base_tx_id = bodies_cursor - .last()? - .map(|(_, body)| body.base_tx_id + body.tx_amount) - .ok_or(DatabaseIntegrityError::BlockBody { number: starting_block })?; + let mut tx_count_cursor = db.cursor_mut::()?; + + // Get id for the first transaction in the block + let mut first_tx_id = db.get_first_tx_id(starting_block)?; // Cursor used to look up headers for block pre-validation let mut header_cursor = db.cursor::()?; @@ -135,19 +136,19 @@ impl Stage for BodyStage Stage for BodyStage, input: UnwindInput, ) -> Result> { - let mut block_body_cursor = db.cursor_mut::()?; + let mut tx_count_cursor = db.cursor_mut::()?; + let mut block_ommers_cursor = db.cursor_mut::()?; let mut transaction_cursor = db.cursor_mut::()?; - let mut entry = block_body_cursor.last()?; - while let Some((key, body)) = entry { + let mut entry = tx_count_cursor.last()?; + while let Some((key, count)) = entry { if key.number() <= input.unwind_to { break } - for num in 0..body.tx_amount { - let tx_id = body.base_tx_id + num; + // First delete the current and find the previous cum tx count value + tx_count_cursor.delete_current()?; + entry = tx_count_cursor.prev()?; + + if block_ommers_cursor.seek_exact(key)?.is_some() { + block_ommers_cursor.delete_current()?; + } + + let prev_count = entry.map(|(_, v)| v).unwrap_or_default(); + for tx_id in prev_count..count { if transaction_cursor.seek_exact(tx_id)?.is_some() { transaction_cursor.delete_current()?; } } - - block_body_cursor.delete_current()?; - entry = block_body_cursor.prev()?; } Ok(UnwindOutput { stage_progress: input.unwind_to }) @@ -395,7 +402,9 @@ mod tests { Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress == previous_stage ); let stage_progress = output.unwrap().stage_progress; - runner.validate_db_blocks(stage_progress).expect("Written block data invalid"); + runner + .validate_db_blocks(input.stage_progress.unwrap_or_default(), stage_progress) + .expect("Written block data invalid"); // Delete a transaction runner @@ -459,7 +468,10 @@ mod tests { use assert_matches::assert_matches; use reth_eth_wire::BlockBody; use reth_interfaces::{ - db::{models::StoredBlockBody, tables, DbCursorRO, DbTx, DbTxMut}, + db::{ + models::{BlockNumHash, NumTransactions, StoredBlockOmmers}, + tables, DbCursorRO, DbTx, DbTxMut, + }, p2p::{ bodies::{ client::BodiesClient, @@ -467,7 +479,10 @@ mod tests { }, error::RequestResult, }, - test_utils::{generators::random_block_range, TestConsensus}, + test_utils::{ + generators::{random_block_range, random_signed_tx}, + TestConsensus, + }, }; use reth_primitives::{BlockLocked, BlockNumber, Header, SealedHeader, H256}; use std::{collections::HashMap, sync::Arc}; @@ -539,11 +554,28 @@ mod tests { type Seed = Vec; fn seed_execution(&mut self, input: ExecInput) -> Result { + self.insert_genesis()?; let start = input.stage_progress.unwrap_or_default(); let end = input.previous_stage_progress() + 1; let blocks = random_block_range(start..end, GENESIS_HASH); - self.insert_genesis()?; self.db.insert_headers(blocks.iter().map(|block| &block.header))?; + if let Some(progress) = blocks.first() { + // Insert last progress data + self.db.commit(|tx| { + let key = (progress.number, progress.hash()).into(); + let last_count = tx + .cursor::()? + .last()? + .map(|(_, v)| v) + .unwrap_or_default(); + let tx_count = last_count + progress.body.len() as u64; + tx.put::(key, tx_count)?; + tx.put::(key, StoredBlockOmmers { ommers: vec![] })?; + (last_count..tx_count).try_for_each(|idx| { + tx.put::(idx, random_signed_tx()) + }) + })?; + } self.set_responses(blocks.iter().map(body_by_hash).collect()); Ok(blocks) } @@ -557,17 +589,20 @@ mod tests { Some(output) => output.stage_progress, None => input.stage_progress.unwrap_or_default(), }; - self.validate_db_blocks(highest_block) + self.validate_db_blocks(highest_block, highest_block) } } impl UnwindStageTestRunner for BodyTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - self.db.check_no_entry_above::(input.unwind_to, |key| { + self.db.check_no_entry_above::(input.unwind_to, |key| { key.number() })?; - if let Some(last_body) = self.last_body() { - let last_tx_id = last_body.base_tx_id + last_body.tx_amount; + self.db.check_no_entry_above::( + input.unwind_to, + |key| key.number(), + )?; + if let Some(last_tx_id) = self.last_count() { self.db .check_no_entry_above::(last_tx_id, |key| key)?; } @@ -584,19 +619,19 @@ mod tests { let header = SealedHeader::new(Header::default(), GENESIS_HASH); self.db.insert_headers(std::iter::once(&header))?; self.db.commit(|tx| { - tx.put::( - (0, GENESIS_HASH).into(), - StoredBlockBody { base_tx_id: 0, tx_amount: 0, ommers: vec![] }, - ) + let key = (0, GENESIS_HASH).into(); + tx.put::(key, 1)?; + tx.put::(key, StoredBlockOmmers { ommers: vec![] })?; + tx.put::(0, random_signed_tx()) })?; Ok(()) } - /// Retrieve the last body from the database - pub(crate) fn last_body(&self) -> Option { + /// Retrieve the last tx count from the database + pub(crate) fn last_count(&self) -> Option { self.db - .query(|tx| Ok(tx.cursor::()?.last()?.map(|e| e.1))) + .query(|tx| Ok(tx.cursor::()?.last()?.map(|e| e.1))) .ok() .flatten() } @@ -604,34 +639,55 @@ mod tests { /// Validate that the inserted block data is valid pub(crate) fn validate_db_blocks( &self, + prev_progress: BlockNumber, highest_block: BlockNumber, ) -> Result<(), TestRunnerError> { self.db.query(|tx| { - let mut block_body_cursor = tx.cursor::()?; + // Acquire cursors on body related tables + let mut ommers_cursor = tx.cursor::()?; + let mut tx_count_cursor = tx.cursor::()?; let mut transaction_cursor = tx.cursor::()?; - let mut entry = block_body_cursor.first()?; - let mut prev_max_tx_id = 0; - while let Some((key, body)) = entry { + let first_tx_count_key = match tx_count_cursor.first()? { + Some((key, _)) => key, + None => return Ok(()), + }; + let mut walker = tx_count_cursor.walk(first_tx_count_key)?.peekable(); + + let mut prev_entry: Option<(BlockNumHash, NumTransactions)> = None; + while let Some(entry) = walker.next() { + let (key, count) = entry?; + + // Validate sequentiality only after prev progress, + // since the data before is mocked and can contain gaps + if key.number() > prev_progress { + if let Some((prev_key, _)) = prev_entry { + assert_eq!(prev_key.number() + 1, key.number(), "Tx count entries must be sequential"); + } + } + + // Validate that the current entry is below or equals to the highest allowed block assert!( key.number() <= highest_block, "We wrote a block body outside of our synced range. Found block with number {}, highest block according to stage is {}", key.number(), highest_block ); - assert!(prev_max_tx_id == body.base_tx_id, "Transaction IDs are malformed."); - for num in 0..body.tx_amount { - let tx_id = body.base_tx_id + num; + // Validate that ommers exist + assert_matches!(ommers_cursor.seek_exact(key), Ok(Some(_)), "Block ommers are missing"); + + // Validate that block trasactions exist + let first_tx_id = prev_entry.map(|(_, v)| v).unwrap_or_default(); + for tx_id in first_tx_id..count { assert_matches!( transaction_cursor.seek_exact(tx_id), Ok(Some(_)), "A transaction is missing." ); } - prev_max_tx_id = body.base_tx_id + body.tx_amount; - entry = block_body_cursor.next()?; - } + prev_entry = Some((key, count)); + } Ok(()) })?; Ok(()) diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index a273eb637..dc87fe330 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -6,5 +6,3 @@ pub mod execution; pub mod headers; /// The sender recovery stage. pub mod senders; -/// The cumulative transaction index stage. -pub mod tx_index; diff --git a/crates/stages/src/stages/senders.rs b/crates/stages/src/stages/senders.rs index fe45063ab..6a5003857 100644 --- a/crates/stages/src/stages/senders.rs +++ b/crates/stages/src/stages/senders.rs @@ -48,23 +48,34 @@ impl Stage for SendersStage { db: &mut StageDB<'_, DB>, input: ExecInput, ) -> Result { - // Look up the start index for transaction range - let last_block = db.get_block_numhash(input.stage_progress.unwrap_or_default())?; - let start_tx_index = db.get_tx_count(last_block)?; + let stage_progress = input.stage_progress.unwrap_or_default(); - // Look up the end index for transaction range (exclusive) - let max_block = db.get_block_numhash(input.previous_stage_progress())?; - let end_tx_index = db.get_tx_count(max_block)?; + // Look up the start index for the transaction range + let start_tx_index = db.get_first_tx_id(stage_progress + 1)?; + + // Look up the end index for transaction range (inclusive) + let max_block_num = input.previous_stage_progress(); + let end_tx_index = match db.get_latest_tx_id(max_block_num) { + Ok(id) => id, + // No transactions in the database + Err(_) => { + return Ok(ExecOutput { + stage_progress: max_block_num, + done: true, + reached_tip: true, + }) + } + }; // Acquire the cursor for inserting elements let mut senders_cursor = db.cursor_mut::()?; // Acquire the cursor over the transactions let mut tx_cursor = db.cursor::()?; - // Walk the transactions from start to end index (exclusive) + // Walk the transactions from start to end index (inclusive) let entries = tx_cursor .walk(start_tx_index)? - .take_while(|res| res.as_ref().map(|(k, _)| *k < end_tx_index).unwrap_or_default()); + .take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default()); // Iterate over transactions in chunks for chunk in &entries.chunks(self.batch_size) { @@ -84,7 +95,7 @@ impl Stage for SendersStage { recovered.into_iter().try_for_each(|(id, sender)| senders_cursor.append(id, sender))?; } - Ok(ExecOutput { stage_progress: max_block.number(), done: true, reached_tip: true }) + Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true }) } /// Unwind the stage. @@ -93,29 +104,16 @@ impl Stage for SendersStage { db: &mut StageDB<'_, DB>, input: UnwindInput, ) -> Result> { - // Look up the hash of the unwind block - if let Some((_, unwind_hash)) = - db.get_exact_or_prev::(input.unwind_to)? - { - // Look up the cumulative tx count at unwind block - let key = (input.unwind_to, unwind_hash).into(); - if let Some((_, unwind_tx_count)) = - db.get_exact_or_prev::(key)? - { - // The last remaining tx_id should be at `cum_tx_count - 1` - db.unwind_table_by_num::(unwind_tx_count - 1)?; - } - } - + // Lookup latest tx id that we should unwind to + let latest_tx_id = db.get_latest_tx_id(input.unwind_to).unwrap_or_default(); + db.unwind_table_by_num::(latest_tx_id)?; Ok(UnwindOutput { stage_progress: input.unwind_to }) } } #[cfg(test)] mod tests { - use reth_interfaces::{ - db::models::StoredBlockBody, test_utils::generators::random_block_range, - }; + use reth_interfaces::test_utils::generators::random_block_range; use reth_primitives::{BlockLocked, BlockNumber, H256}; use super::*; @@ -155,19 +153,21 @@ mod tests { self.db.commit(|tx| { let mut base_tx_id = 0; blocks.iter().try_for_each(|b| { - let ommers = b.ommers.iter().map(|o| o.clone().unseal()).collect::>(); let txs = b.body.clone(); let tx_amount = txs.len() as u64; let num_hash = (b.number, b.hash()).into(); tx.put::(b.number, b.hash())?; - tx.put::( - num_hash, - StoredBlockBody { base_tx_id, tx_amount, ommers }, - )?; tx.put::(num_hash, base_tx_id + tx_amount)?; for body_tx in txs { + // Insert senders for previous stage progress + if b.number == stage_progress { + tx.put::( + base_tx_id, + body_tx.recover_signer().expect("failed to recover sender"), + )?; + } tx.put::(base_tx_id, body_tx)?; base_tx_id += 1; } @@ -194,13 +194,17 @@ mod tests { return Ok(()) } - let start_hash = tx.get::(start_block)?.unwrap(); - let mut body_cursor = tx.cursor::()?; - let body_walker = body_cursor.walk((start_block, start_hash).into())?; + let mut tx_count_cursor = tx.cursor::()?; - for entry in body_walker { - let (_, body) = entry?; - for tx_id in body.base_tx_id..body.base_tx_id + body.tx_amount { + let last_block = start_block - 1; + let last_hash = tx.get::(start_block)?.unwrap(); + let mut last_tx_count = tx_count_cursor + .seek_exact((last_block, last_hash).into())? + .map(|(_, v)| v) + .unwrap_or_default(); + + while let Some((_, count)) = tx_count_cursor.next()? { + for tx_id in last_tx_count..count { let transaction = tx .get::(tx_id)? .expect("no transaction entry"); @@ -208,12 +212,13 @@ mod tests { transaction.recover_signer().expect("failed to recover signer"); assert_eq!(Some(signer), tx.get::(tx_id)?); } + last_tx_count = count; } Ok(()) })?; } else { - self.check_no_transaction_by_block(input.stage_progress.unwrap_or_default())?; + self.check_no_senders_by_block(input.stage_progress.unwrap_or_default())?; } Ok(()) @@ -222,42 +227,23 @@ mod tests { impl UnwindStageTestRunner for SendersTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - self.check_no_transaction_by_block(input.unwind_to) + self.check_no_senders_by_block(input.unwind_to) } } impl SendersTestRunner { - fn check_no_transaction_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - match self.get_block_body_entry(block)? { - Some(body) => { - let last_index = body.base_tx_id + body.tx_amount; - self.db.check_no_entry_above::(last_index, |key| key)?; + fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { + let latest_tx_id = self.db.inner().get_latest_tx_id(block); + match latest_tx_id { + Ok(last_index) => { + self.db.check_no_entry_above::(last_index, |key| key)? } - None => { + Err(_) => { assert!(self.db.table_is_empty::()?); } }; - Ok(()) - } - /// Get the block body entry at block number. If it doesn't exist, - /// fallback to the previous entry. - fn get_block_body_entry( - &self, - block: BlockNumber, - ) -> Result, TestRunnerError> { - let entry = self.db.query(|tx| match tx.get::(block)? { - Some(hash) => { - let mut body_cursor = tx.cursor::()?; - let entry = match body_cursor.seek_exact((block, hash).into())? { - Some((_, block)) => Some(block), - _ => body_cursor.prev()?.map(|(_, block)| block), - }; - Ok(entry) - } - None => Ok(None), - })?; - Ok(entry) + Ok(()) } } } diff --git a/crates/stages/src/stages/tx_index.rs b/crates/stages/src/stages/tx_index.rs deleted file mode 100644 index 00dd7b232..000000000 --- a/crates/stages/src/stages/tx_index.rs +++ /dev/null @@ -1,192 +0,0 @@ -use crate::{ - db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, - UnwindInput, UnwindOutput, -}; -use reth_interfaces::db::{tables, Database, DbCursorRO, DbCursorRW, DbTxMut}; -use std::fmt::Debug; - -const TX_INDEX: StageId = StageId("TxIndex"); - -/// The cumulative transaction index stage -/// implementation for staged sync. This stage -/// updates the cumulative transaction count per block. -/// -/// e.g. [key, value] entries in [tables::CumulativeTxCount] -/// block #1 with 24 transactions - [1, 24] -/// block #2 with 42 transactions - [2, 66] -/// block #3 with 33 transaction - [3, 99] -#[derive(Debug)] -pub struct TxIndex; - -#[async_trait::async_trait] -impl Stage for TxIndex { - /// Return the id of the stage - fn id(&self) -> StageId { - TX_INDEX - } - - /// Execute the stage - async fn execute( - &mut self, - db: &mut StageDB<'_, DB>, - input: ExecInput, - ) -> Result { - // The progress of this stage during last iteration - let last_block = db.get_block_numhash(input.stage_progress.unwrap_or_default())?; - - // The start block for this iteration - let start_block = db.get_block_numhash(last_block.number() + 1)?; - - // The maximum block that this stage should insert to - let max_block = input.previous_stage_progress(); - - // Get the cursor over the table - let mut cursor = db.cursor_mut::()?; - // Find the last count that was inserted during previous iteration - let (_, mut count) = - cursor.seek_exact(last_block)?.ok_or(DatabaseIntegrityError::CumulativeTxCount { - number: last_block.number(), - hash: last_block.hash(), - })?; - - // Get the cursor over block bodies - let mut body_cursor = db.cursor_mut::()?; - let walker = body_cursor.walk(start_block)?; - - // Walk the block body entries up to maximum block (including) - let entries = walker - .take_while(|b| b.as_ref().map(|(k, _)| k.number() <= max_block).unwrap_or_default()); - - // Aggregate and insert cumulative transaction count for each block number - for entry in entries { - let (key, body) = entry?; - count += body.tx_amount; - cursor.append(key, count)?; - } - - Ok(ExecOutput { done: true, reached_tip: true, stage_progress: max_block }) - } - - /// Unwind the stage. - async fn unwind( - &mut self, - db: &mut StageDB<'_, DB>, - input: UnwindInput, - ) -> Result> { - db.unwind_table_by_num_hash::(input.unwind_to)?; - Ok(UnwindOutput { stage_progress: input.unwind_to }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_utils::{ - stage_test_suite, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, - UnwindStageTestRunner, - }; - use reth_interfaces::{ - db::{ - models::{BlockNumHash, StoredBlockBody}, - DbTx, - }, - test_utils::generators::random_header_range, - }; - use reth_primitives::H256; - - stage_test_suite!(TxIndexTestRunner); - - #[derive(Default)] - pub(crate) struct TxIndexTestRunner { - db: TestStageDB, - } - - impl StageTestRunner for TxIndexTestRunner { - type S = TxIndex; - - fn db(&self) -> &TestStageDB { - &self.db - } - - fn stage(&self) -> Self::S { - TxIndex {} - } - } - - impl ExecuteStageTestRunner for TxIndexTestRunner { - type Seed = (); - - fn seed_execution(&mut self, input: ExecInput) -> Result { - let pivot = input.stage_progress.unwrap_or_default(); - let start = pivot.saturating_sub(100); - let mut end = input.previous_stage_progress(); - end += 2; // generate 2 additional headers to account for start header lookup - let headers = random_header_range(start..end, H256::zero()); - - let headers = - headers.into_iter().map(|h| (h, rand::random::())).collect::>(); - - self.db.map_put::(&headers, |(h, _)| { - (h.number, h.hash()) - })?; - self.db.map_put::(&headers, |(h, count)| { - ( - BlockNumHash((h.number, h.hash())), - StoredBlockBody { base_tx_id: 0, tx_amount: *count as u64, ommers: vec![] }, - ) - })?; - - let slice_up_to = - std::cmp::min(pivot.saturating_sub(start) as usize, headers.len() - 1); - self.db.transform_append::( - &headers[..=slice_up_to], - |prev, (h, count)| { - (BlockNumHash((h.number, h.hash())), prev.unwrap_or_default() + (*count as u64)) - }, - )?; - - Ok(()) - } - - fn validate_execution( - &self, - input: ExecInput, - _output: Option, - ) -> Result<(), TestRunnerError> { - // TODO: validate that base_tx_index of next block equals the cum count at current - self.db.query(|tx| { - let (start, end) = - (input.stage_progress.unwrap_or_default(), input.previous_stage_progress()); - if start >= end { - return Ok(()) - } - - let start_hash = - tx.get::(start)?.expect("no canonical found"); - let mut tx_count_cursor = tx.cursor::()?; - let mut tx_count_walker = tx_count_cursor.walk((start, start_hash).into())?; - let mut count = tx_count_walker.next().unwrap()?.1; - let mut last_num = start; - for entry in tx_count_walker { - let (key, db_count) = entry?; - count += tx.get::(key)?.unwrap().tx_amount; - assert_eq!(db_count, count); - last_num = key.number(); - } - assert_eq!(last_num, end); - - Ok(()) - })?; - Ok(()) - } - } - - impl UnwindStageTestRunner for TxIndexTestRunner { - fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - self.db.check_no_entry_above::(input.unwind_to, |h| { - h.number() - })?; - Ok(()) - } - } -} diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index a59272e93..3392cdb87 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -30,7 +30,7 @@ impl Default for TestStageDB { impl TestStageDB { /// Return a database wrapped in [StageDB]. - fn inner(&self) -> StageDB<'_, Env> { + pub(crate) fn inner(&self) -> StageDB<'_, Env> { StageDB::new(self.db.borrow()).expect("failed to create db container") } @@ -73,6 +73,7 @@ impl TestStageDB { /// let db = StageTestDB::default(); /// db.map_put::(&items, |item| item)?; /// ``` + #[allow(dead_code)] pub(crate) fn map_put(&self, values: &[S], mut map: F) -> Result<(), db::Error> where T: Table, @@ -96,6 +97,7 @@ impl TestStageDB { /// let db = StageTestDB::default(); /// db.transform_append::(&items, |prev, item| prev.unwrap_or_default() + item)?; /// ``` + #[allow(dead_code)] pub(crate) fn transform_append( &self, values: &[S],