diff --git a/bin/reth/src/db/mod.rs b/bin/reth/src/db/mod.rs index 28dca132e..00c232931 100644 --- a/bin/reth/src/db/mod.rs +++ b/bin/reth/src/db/mod.rs @@ -164,7 +164,7 @@ impl Command { HeaderTD, HeaderNumbers, Headers, - BlockBodies, + BlockBodyIndices, BlockOmmers, BlockWithdrawals, TransactionBlock, @@ -174,8 +174,6 @@ impl Command { PlainStorageState, PlainAccountState, Bytecodes, - BlockTransitionIndex, - TxTransitionIndex, AccountHistory, StorageHistory, AccountChangeSet, diff --git a/bin/reth/src/dump_stage/execution.rs b/bin/reth/src/dump_stage/execution.rs index 3afe47862..75dd8fad6 100644 --- a/bin/reth/src/dump_stage/execution.rs +++ b/bin/reth/src/dump_stage/execution.rs @@ -52,7 +52,7 @@ fn import_tables_with_range( tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) })??; output_db.update(|tx| { - tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) })??; output_db.update(|tx| { tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) @@ -60,15 +60,15 @@ fn import_tables_with_range( // Find range of transactions that need to be copied over let (from_tx, to_tx) = db_tool.db.view(|read_tx| { - let mut read_cursor = read_tx.cursor_read::()?; + let mut read_cursor = read_tx.cursor_read::()?; let (_, from_block) = read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?; let (_, to_block) = read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?; Ok::<(u64, u64), eyre::ErrReport>(( - from_block.start_tx_id, - to_block.start_tx_id + to_block.tx_count, + from_block.first_tx_num, + to_block.first_tx_num + to_block.tx_count, )) })??; diff --git a/bin/reth/src/dump_stage/hashing_account.rs b/bin/reth/src/dump_stage/hashing_account.rs index 6a281674b..4ae028b47 100644 --- a/bin/reth/src/dump_stage/hashing_account.rs +++ b/bin/reth/src/dump_stage/hashing_account.rs @@ -21,10 +21,14 @@ pub(crate) async fn dump_hashing_account_stage( // Import relevant AccountChangeSets let tx = db_tool.db.tx()?; - let from_transition_rev = - tx.get::(from)?.expect("there should be at least one."); - let to_transition_rev = - tx.get::(to)?.expect("there should be at least one."); + let from_transition_rev = tx + .get::(from)? + .expect("there should be at least one.") + .transition_at_block(); + let to_transition_rev = tx + .get::(to)? + .expect("there should be at least one.") + .transition_after_block(); output_db.update(|tx| { tx.import_table_with_range::( &db_tool.db.tx()?, diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index 3f76276a0..3d0357300 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -28,10 +28,14 @@ pub(crate) async fn dump_merkle_stage( })??; let tx = db_tool.db.tx()?; - let from_transition_rev = - tx.get::(from)?.expect("there should be at least one."); - let to_transition_rev = - tx.get::(to)?.expect("there should be at least one."); + let from_transition_rev = tx + .get::(from)? + .expect("there should be at least one.") + .transition_at_block(); + let to_transition_rev = tx + .get::(to)? + .expect("there should be at least one.") + .transition_after_block(); output_db.update(|tx| { tx.import_table_with_range::( diff --git a/bin/reth/src/dump_stage/mod.rs b/bin/reth/src/dump_stage/mod.rs index 5603c64a2..9c9829610 100644 --- a/bin/reth/src/dump_stage/mod.rs +++ b/bin/reth/src/dump_stage/mod.rs @@ -108,8 +108,8 @@ impl Command { } } -/// Sets up the database and initial state on `BlockTransitionIndex`. Also returns the tip block -/// number. +/// Sets up the database and initial state on [`tables::BlockBodyIndices`]. Also returns the tip +/// block number. pub(crate) fn setup( from: u64, to: u64, @@ -123,17 +123,15 @@ pub(crate) fn setup( let output_db = init_db(output_db)?; output_db.update(|tx| { - tx.import_table_with_range::( + tx.import_table_with_range::( &db_tool.db.tx()?, Some(from - 1), to + 1, ) })??; - let (tip_block_number, _) = db_tool - .db - .view(|tx| tx.cursor_read::()?.last())?? - .expect("some"); + let (tip_block_number, _) = + db_tool.db.view(|tx| tx.cursor_read::()?.last())??.expect("some"); Ok((output_db, tip_block_number)) } diff --git a/bin/reth/src/test_vectors/tables.rs b/bin/reth/src/test_vectors/tables.rs index f2eae45ba..bf447456c 100644 --- a/bin/reth/src/test_vectors/tables.rs +++ b/bin/reth/src/test_vectors/tables.rs @@ -60,11 +60,9 @@ pub(crate) fn generate_vectors(mut tables: Vec) -> Result<()> { (HeaderTD, PER_TABLE, TABLE), (HeaderNumbers, PER_TABLE, TABLE), (Headers, PER_TABLE, TABLE), - (BlockBodies, PER_TABLE, TABLE), + (BlockBodyIndices, PER_TABLE, TABLE), (BlockOmmers, 100, TABLE), (TxHashNumber, PER_TABLE, TABLE), - (BlockTransitionIndex, PER_TABLE, TABLE), - (TxTransitionIndex, PER_TABLE, TABLE), (Transactions, 100, TABLE), (PlainStorageState, PER_TABLE, DUPSORT), (PlainAccountState, PER_TABLE, TABLE) diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index b20e7b92a..c62f401f6 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -17,8 +17,8 @@ pub enum ProviderError { #[error("Block hash {block_hash:?} does not exist in Headers table")] BlockHash { block_hash: BlockHash }, /// A block body is missing. - #[error("Block body not found for block #{number}")] - BlockBody { number: BlockNumber }, + #[error("Block meta not found for block #{number}")] + BlockBodyIndices { number: BlockNumber }, /// The block transition id for a certain block number is missing. #[error("Block transition id does not exist for block #{block_number}")] BlockTransition { block_number: BlockNumber }, diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index 29a93dc88..dea69370c 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -64,8 +64,7 @@ pub fn init_genesis( // Insert header tx.put::(0, hash)?; tx.put::(hash, 0)?; - tx.put::(0, Default::default())?; - tx.put::(0, 0)?; + tx.put::(0, Default::default())?; tx.put::(0, header.difficulty.into())?; tx.put::(0, header)?; diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 0385099fa..d338b1204 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -98,7 +98,7 @@ fn merkle(c: &mut Criterion) { &mut group, setup::unwind_hashes, stage, - 1..DEFAULT_NUM_BLOCKS + 1, + 1..DEFAULT_NUM_BLOCKS, "Merkle-incremental".to_string(), ); @@ -107,7 +107,7 @@ fn merkle(c: &mut Criterion) { &mut group, setup::unwind_hashes, stage, - 1..DEFAULT_NUM_BLOCKS + 1, + 1..DEFAULT_NUM_BLOCKS, "Merkle-fullhash".to_string(), ); } diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index 033b71da9..9d2a434d6 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -11,7 +11,7 @@ use std::path::{Path, PathBuf}; /// Prepares a database for [`AccountHashingStage`] /// If the environment variable [`constants::ACCOUNT_HASHING_DB`] is set, it will use that one and -/// will get the stage execution range from [`tables::BlockTransitionIndex`]. Otherwise, it will +/// will get the stage execution range from [`tables::BlockBodyIndices`]. Otherwise, it will /// generate its own random data. /// /// Returns the path to the database file, stage and range of stage execution if it exists. @@ -33,7 +33,7 @@ fn find_stage_range(db: &Path) -> StageRange { TestTransaction::new(db) .tx .view(|tx| { - let mut cursor = tx.cursor_read::()?; + let mut cursor = tx.cursor_read::()?; let from = cursor.first()?.unwrap().0; let to = cursor.last()?.unwrap().0; @@ -55,7 +55,7 @@ fn find_stage_range(db: &Path) -> StageRange { fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { let opts = SeedOpts { blocks: 0..num_blocks + 1, - accounts: 0..10_000, + accounts: 0..100_000, txs: 100..150, transitions: 10_000 + 1, }; diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 501534cff..b93a75fd4 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -730,15 +730,15 @@ mod tests { let db = test_utils::create_test_db::(EnvKind::RW); let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() .add_stage(TestStage::new(StageId("Fatal")).add_exec(Err( - StageError::DatabaseIntegrity(ProviderError::BlockBody { number: 5 }), + StageError::DatabaseIntegrity(ProviderError::BlockBodyIndices { number: 5 }), ))) .build(); let result = pipeline.run(db).await; assert_matches!( result, - Err(PipelineError::Stage(StageError::DatabaseIntegrity(ProviderError::BlockBody { - number: 5 - }))) + Err(PipelineError::Stage(StageError::DatabaseIntegrity( + ProviderError::BlockBodyIndices { number: 5 } + ))) ); } } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index f90891b16..518abfbea 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -6,7 +6,7 @@ use futures_util::TryStreamExt; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, - models::{StoredBlockBody, StoredBlockOmmers, StoredBlockWithdrawals}, + models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals}, tables, transaction::{DbTx, DbTxMut}, }; @@ -42,11 +42,9 @@ pub const BODIES: StageId = StageId("Bodies"); /// The bodies are processed and data is inserted into these tables: /// /// - [`BlockOmmers`][reth_db::tables::BlockOmmers] -/// - [`BlockBodies`][reth_db::tables::BlockBodies] +/// - [`BlockBodies`][reth_db::tables::BlockBodyIndices] /// - [`Transactions`][reth_db::tables::Transactions] /// - [`TransactionBlock`][reth_db::tables::TransactionBlock] -/// - [`BlockTransitionIndex`][reth_db::tables::BlockTransitionIndex] -/// - [`TxTransitionIndex`][reth_db::tables::TxTransitionIndex] /// /// # Genesis /// @@ -54,10 +52,8 @@ pub const BODIES: StageId = StageId("Bodies"); /// /// - The header tables (see [`HeaderStage`][crate::stages::HeaderStage]) /// - The [`BlockOmmers`][reth_db::tables::BlockOmmers] table -/// - The [`BlockBodies`][reth_db::tables::BlockBodies] table +/// - The [`BlockBodies`][reth_db::tables::BlockBodyIndices] table /// - The [`Transactions`][reth_db::tables::Transactions] table -/// - The [`BlockTransitionIndex`][reth_db::tables::BlockTransitionIndex] table -/// - The [`TxTransitionIndex`][reth_db::tables::TxTransitionIndex] table #[derive(Debug)] pub struct BodyStage { /// The body downloader. @@ -89,21 +85,17 @@ impl Stage for BodyStage { let mut td_cursor = tx.cursor_read::()?; // Cursors used to write bodies, ommers and transactions - let mut body_cursor = tx.cursor_write::()?; + let mut block_meta_cursor = tx.cursor_write::()?; let mut tx_cursor = tx.cursor_write::()?; let mut tx_block_cursor = tx.cursor_write::()?; let mut ommers_cursor = tx.cursor_write::()?; let mut withdrawals_cursor = tx.cursor_write::()?; - // Cursors used to write state transition mapping - let mut block_transition_cursor = tx.cursor_write::()?; - let mut tx_transition_cursor = tx.cursor_write::()?; - // Get id for the first transaction and first transition in the block - let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(start_block)?; + let (mut next_tx_num, mut next_transition_id) = tx.get_next_block_ids(start_block)?; let mut highest_block = input.stage_progress.unwrap_or_default(); - debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = current_tx_id, transition_id, "Commencing sync"); + debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = next_tx_num, next_transition_id, "Commencing sync"); // Task downloader can return `None` only if the response relaying channel was closed. This // is a fatal error to prevent the pipeline from running forever. @@ -116,29 +108,24 @@ impl Stage for BodyStage { let block_number = response.block_number(); let difficulty = response.difficulty(); + let first_tx_num = next_tx_num; + let first_transition_id = next_transition_id; + let mut tx_count = 0; let mut has_withdrawals = false; match response { BlockResponse::Full(block) => { - let body = StoredBlockBody { - start_tx_id: current_tx_id, - tx_count: block.body.len() as u64, - }; - body_cursor.append(block_number, body.clone())?; - + tx_count = block.body.len() as u64; // write transaction block index - if !body.is_empty() { - tx_block_cursor.append(body.last_tx_index(), block.number)?; + if !block.body.is_empty() { + tx_block_cursor.append(first_tx_num, block.number)?; } // Write transactions for transaction in block.body { // Append the transaction - tx_cursor.append(current_tx_id, transaction)?; - tx_transition_cursor.append(current_tx_id, transition_id)?; + tx_cursor.append(next_tx_num, transaction)?; // Increment transaction id for each transaction. - current_tx_id += 1; - // Increment transition id for each transaction. - transition_id += 1; + next_tx_num += 1; } // Write ommers if any @@ -164,12 +151,7 @@ impl Stage for BodyStage { } } } - BlockResponse::Empty(_) => { - body_cursor.append( - block_number, - StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 }, - )?; - } + BlockResponse::Empty(_) => {} }; // The block transition marks the final state at the end of the block. @@ -181,11 +163,22 @@ impl Stage for BodyStage { .ok_or(ProviderError::TotalDifficulty { number: block_number })? .1; let has_reward = self.consensus.has_block_reward(td.into(), difficulty); - let has_post_block_transition = has_reward || has_withdrawals; - if has_post_block_transition { - transition_id += 1; - } - block_transition_cursor.append(block_number, transition_id)?; + let has_block_change = has_reward || has_withdrawals; + + // Increment transition id for each transaction, + // and by +1 if the block has its own state change (an block reward or withdrawals). + next_transition_id += tx_count + has_block_change as u64; + + // insert block meta + block_meta_cursor.append( + block_number, + StoredBlockBodyIndices { + first_tx_num, + first_transition_id, + has_block_change, + tx_count, + }, + )?; highest_block = block_number; } @@ -206,17 +199,15 @@ impl Stage for BodyStage { ) -> Result { info!(target: "sync::stages::bodies", to_block = input.unwind_to, "Unwinding"); // Cursors to unwind bodies, ommers - let mut body_cursor = tx.cursor_write::()?; + let mut body_cursor = tx.cursor_write::()?; let mut transaction_cursor = tx.cursor_write::()?; let mut ommers_cursor = tx.cursor_write::()?; let mut withdrawals_cursor = tx.cursor_write::()?; // Cursors to unwind transitions - let mut block_transition_cursor = tx.cursor_write::()?; - let mut tx_transition_cursor = tx.cursor_write::()?; let mut tx_block_cursor = tx.cursor_write::()?; let mut rev_walker = body_cursor.walk_back(None)?; - while let Some((number, body)) = rev_walker.next().transpose()? { + while let Some((number, block_meta)) = rev_walker.next().transpose()? { if number <= input.unwind_to { break } @@ -231,30 +222,23 @@ impl Stage for BodyStage { withdrawals_cursor.delete_current()?; } - // Delete the block transition if any - if block_transition_cursor.seek_exact(number)?.is_some() { - block_transition_cursor.delete_current()?; - } - // Delete all transaction to block values. - if !body.is_empty() && tx_block_cursor.seek_exact(body.last_tx_index())?.is_some() { + if !block_meta.is_empty() && + tx_block_cursor.seek_exact(block_meta.last_tx_num())?.is_some() + { tx_block_cursor.delete_current()?; } // Delete all transactions that belong to this block - for tx_id in body.tx_id_range() { + for tx_id in block_meta.tx_num_range() { // First delete the transaction if transaction_cursor.seek_exact(tx_id)?.is_some() { transaction_cursor.delete_current()?; } - // Delete the transaction transition if any - if tx_transition_cursor.seek_exact(tx_id)?.is_some() { - tx_transition_cursor.delete_current()?; - } } // Delete the current body value - tx.delete::(number, None)?; + tx.delete::(number, None)?; } Ok(UnwindOutput { stage_progress: input.unwind_to }) @@ -443,7 +427,7 @@ mod tests { cursor::DbCursorRO, database::Database, mdbx::{Env, WriteMap}, - models::{StoredBlockBody, StoredBlockOmmers}, + models::{StoredBlockBodyIndices, StoredBlockOmmers}, tables, transaction::{DbTx, DbTxMut}, }; @@ -548,30 +532,26 @@ mod tests { if let Some(progress) = blocks.first() { // Insert last progress data self.tx.commit(|tx| { - let body = StoredBlockBody { - start_tx_id: 0, + let body = StoredBlockBodyIndices { + first_tx_num: 0, + first_transition_id: 0, tx_count: progress.body.len() as u64, + has_block_change: true, }; - body.tx_id_range().try_for_each(|tx_id| { + body.tx_num_range().try_for_each(|tx_num| { let transaction = random_signed_tx(); - tx.put::(tx_id, transaction)?; - tx.put::(tx_id, tx_id) + tx.put::(tx_num, transaction) })?; - let last_transition_id = progress.body.len() as u64; - let block_transition_id = last_transition_id + 1; // for block reward - - tx.put::( - progress.number, - block_transition_id, - )?; if body.tx_count != 0 { tx.put::( - body.first_tx_index(), + body.first_tx_num(), progress.number, )?; } - tx.put::(progress.number, body)?; + + tx.put::(progress.number, body)?; + if !progress.ommers_hash_is_empty() { tx.put::( progress.number, @@ -606,21 +586,15 @@ mod tests { impl UnwindStageTestRunner for BodyTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - self.tx - .ensure_no_entry_above::(input.unwind_to, |key| key)?; - self.tx - .ensure_no_entry_above::(input.unwind_to, |key| key)?; - self.tx.ensure_no_entry_above::( + self.tx.ensure_no_entry_above::( input.unwind_to, |key| key, )?; + self.tx + .ensure_no_entry_above::(input.unwind_to, |key| key)?; if let Some(last_tx_id) = self.get_last_tx_id()? { self.tx .ensure_no_entry_above::(last_tx_id, |key| key)?; - self.tx.ensure_no_entry_above::( - last_tx_id, - |key| key, - )?; self.tx.ensure_no_entry_above::( last_tx_id, |key| key, @@ -634,12 +608,12 @@ mod tests { /// Get the last available tx id if any pub(crate) fn get_last_tx_id(&self) -> Result, TestRunnerError> { let last_body = self.tx.query(|tx| { - let v = tx.cursor_read::()?.last()?; + let v = tx.cursor_read::()?.last()?; Ok(v) })?; Ok(match last_body { Some((_, body)) if body.tx_count != 0 => { - Some(body.start_tx_id + body.tx_count - 1) + Some(body.first_tx_num + body.tx_count - 1) } _ => None, }) @@ -655,11 +629,9 @@ mod tests { // Acquire cursors on body related tables let mut headers_cursor = tx.cursor_read::()?; let mut td_cursor = tx.cursor_read::()?; - let mut bodies_cursor = tx.cursor_read::()?; + let mut bodies_cursor = tx.cursor_read::()?; let mut ommers_cursor = tx.cursor_read::()?; - let mut block_transition_cursor = tx.cursor_read::()?; let mut transaction_cursor = tx.cursor_read::()?; - let mut tx_transition_cursor = tx.cursor_read::()?; let mut tx_block_cursor = tx.cursor_read::()?; let first_body_key = match bodies_cursor.first()? { @@ -696,19 +668,18 @@ mod tests { assert!(stored_ommers.is_some(), "Missing ommers entry"); } - let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_index())?.map(|(_,b)| b); + let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b); if body.tx_count == 0 { assert_ne!(tx_block_id,Some(number)); } else { assert_eq!(tx_block_id, Some(number)); } - for tx_id in body.tx_id_range() { + assert_eq!(body.first_transition_id, expected_transition_id); + + for tx_id in body.tx_num_range() { let tx_entry = transaction_cursor.seek_exact(tx_id)?; assert!(tx_entry.is_some(), "Transaction is missing."); - assert_eq!( - tx_transition_cursor.seek_exact(tx_id).expect("to be okay").expect("to be present").1, expected_transition_id - ); // Increment expected id for each transaction transition. expected_transition_id += 1; } @@ -722,10 +693,6 @@ mod tests { expected_transition_id += 1; } - // Validate that block transition exists - assert_eq!(block_transition_cursor.seek_exact(number).expect("To be okay").expect("Block transition to be present").1,expected_transition_id); - - prev_number = Some(number); } Ok(()) diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index b386e3b82..8fc7c1511 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -36,7 +36,7 @@ pub struct ExecutionStageMetrics { /// - [tables::CanonicalHeaders] get next block to execute. /// - [tables::Headers] get for revm environment variables. /// - [tables::HeaderTD] -/// - [tables::BlockBodies] to get tx number +/// - [tables::BlockBodyIndices] to get tx number /// - [tables::Transactions] to execute /// /// For state access [LatestStateProviderRef] provides us latest state and history state @@ -53,7 +53,7 @@ pub struct ExecutionStageMetrics { /// - [tables::StorageChangeSet] /// /// For unwinds we are accessing: -/// - [tables::BlockBodies] get tx index to know what needs to be unwinded +/// - [tables::BlockBodyIndices] get tx index to know what needs to be unwinded /// - [tables::AccountHistory] to remove change set and apply old values to /// - [tables::PlainAccountState] [tables::StorageHistory] to remove change set and apply old values /// to [tables::PlainStorageState] diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index a129a5bcb..0546ecac3 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,4 +1,6 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use itertools::Itertools; +use rayon::slice::ParallelSliceMut; use reth_codecs::Compact; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -8,7 +10,8 @@ use reth_db::{ }; use reth_primitives::{keccak256, AccountHashingCheckpoint}; use reth_provider::Transaction; -use std::{collections::BTreeMap, fmt::Debug, ops::Range}; +use std::{fmt::Debug, ops::Range}; +use tokio::sync::mpsc; use tracing::*; /// The [`StageId`] of the account hashing stage. @@ -21,7 +24,7 @@ pub struct AccountHashingStage { /// The threshold (in number of state transitions) for switching between incremental /// hashing and full storage hashing. pub clean_threshold: u64, - /// The maximum number of blocks to process before committing. + /// The maximum number of accounts to process before committing. pub commit_threshold: u64, } @@ -126,8 +129,12 @@ impl AccountHashingStage { } // seed account changeset - let (_, last_transition) = - tx.cursor_read::()?.last()?.unwrap(); + let last_transition = tx + .cursor_read::()? + .last()? + .unwrap() + .1 + .transition_after_block(); let first_transition = last_transition.checked_sub(transitions).unwrap_or_default(); @@ -192,13 +199,38 @@ impl Stage for AccountHashingStage { let start_address = checkpoint.address.take(); let next_address = { - let mut accounts = tx.cursor_read::()?; + let mut accounts_cursor = tx.cursor_read::()?; - let hashed_batch = accounts + // channels used to return result of account hashing + let mut channels = Vec::new(); + for chunk in &accounts_cursor .walk(start_address)? .take(self.commit_threshold as usize) - .map(|res| res.map(|(address, account)| (keccak256(address), account))) - .collect::, _>>()?; + .chunks(self.commit_threshold as usize / rayon::current_num_threads()) + { + // An _unordered_ channel to receive results from a rayon job + let (tx, rx) = mpsc::unbounded_channel(); + channels.push(rx); + + let chunk = chunk.collect::, _>>()?; + // Spawn the hashing task onto the global rayon pool + rayon::spawn(move || { + for (address, account) in chunk.into_iter() { + let _ = tx.send((keccak256(address), account)); + } + }); + } + let mut hashed_batch = Vec::with_capacity(self.commit_threshold as usize); + + // Iterate over channels and append the hashed accounts. + for mut channel in channels { + while let Some(hashed) = channel.recv().await { + hashed_batch.push(hashed); + } + } + + // sort it all in parallel + hashed_batch.par_sort_unstable_by(|a, b| a.0.cmp(&b.0)); let mut hashed_account_cursor = tx.cursor_write::()?; @@ -214,7 +246,7 @@ impl Stage for AccountHashingStage { } // next key of iterator - accounts.next()? + accounts_cursor.next()? }; if let Some((next_address, _)) = &next_address { diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 0480e6931..abcccf535 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -226,7 +226,7 @@ mod tests { use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, mdbx::{tx::Tx, WriteMap, RW}, - models::{StoredBlockBody, TransitionIdAddress}, + models::{StoredBlockBodyIndices, TransitionIdAddress}, }; use reth_interfaces::test_utils::generators::{ random_block_range, random_contract_account_range, @@ -321,20 +321,15 @@ mod tests { self.tx.insert_headers(blocks.iter().map(|block| &block.header))?; let iter = blocks.iter(); - let (mut transition_id, mut tx_id) = (0, 0); - + let (mut next_transition_id, mut next_tx_num) = (0, 0); + let mut first_transition_id = next_transition_id; + let mut first_tx_num = next_tx_num; for progress in iter { // Insert last progress data self.tx.commit(|tx| { - let body = StoredBlockBody { - start_tx_id: tx_id, - tx_count: progress.body.len() as u64, - }; - progress.body.iter().try_for_each(|transaction| { - tx.put::(transaction.hash(), tx_id)?; - tx.put::(tx_id, transaction.clone())?; - tx.put::(tx_id, transition_id)?; + tx.put::(transaction.hash(), next_tx_num)?; + tx.put::(next_tx_num, transaction.clone())?; let (addr, _) = accounts .get_mut(rand::random::() % n_accounts as usize) @@ -347,14 +342,14 @@ mod tests { }; self.insert_storage_entry( tx, - (transition_id, *addr).into(), + (next_transition_id, *addr).into(), new_entry, progress.header.number == stage_progress, )?; } - tx_id += 1; - transition_id += 1; + next_tx_num += 1; + next_transition_id += 1; Ok(()) })?; @@ -363,18 +358,27 @@ mod tests { if has_reward { self.insert_storage_entry( tx, - (transition_id, Address::random()).into(), + (next_transition_id, Address::random()).into(), StorageEntry { key: keccak256("mining"), value: U256::from(rand::random::()), }, progress.header.number == stage_progress, )?; - transition_id += 1; + next_transition_id += 1; } - tx.put::(progress.number, transition_id)?; - tx.put::(progress.number, body) + let body = StoredBlockBodyIndices { + first_tx_num, + first_transition_id, + tx_count: progress.body.len() as u64, + has_block_change: has_reward, + }; + + first_transition_id = next_transition_id; + first_tx_num = next_tx_num; + + tx.put::(progress.number, body) })?; } diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 0d30e9e2f..0c28aa79e 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -80,7 +80,10 @@ mod tests { use super::*; use crate::test_utils::{TestTransaction, PREV_STAGE_ID}; use reth_db::{ - models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey}, + models::{ + sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey, + StoredBlockBodyIndices, + }, tables, transaction::DbTxMut, TransitionList, @@ -118,8 +121,25 @@ mod tests { // setup tx.commit(|tx| { // we just need first and last - tx.put::(0, 3).unwrap(); - tx.put::(5, 7).unwrap(); + tx.put::( + 0, + StoredBlockBodyIndices { + first_transition_id: 0, + tx_count: 3, + ..Default::default() + }, + ) + .unwrap(); + + tx.put::( + 5, + StoredBlockBodyIndices { + first_transition_id: 3, + tx_count: 5, + ..Default::default() + }, + ) + .unwrap(); // setup changeset that are going to be applied to history index tx.put::(4, acc()).unwrap(); diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 390039d53..f4f4643f5 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -86,7 +86,7 @@ mod tests { use reth_db::{ models::{ storage_sharded_key::{StorageShardedKey, NUM_OF_INDICES_IN_SHARD}, - ShardedKey, TransitionIdAddress, + ShardedKey, StoredBlockBodyIndices, TransitionIdAddress, }, tables, transaction::DbTxMut, @@ -135,8 +135,25 @@ mod tests { // setup tx.commit(|tx| { // we just need first and last - tx.put::(0, 3).unwrap(); - tx.put::(5, 7).unwrap(); + tx.put::( + 0, + StoredBlockBodyIndices { + first_transition_id: 0, + tx_count: 3, + ..Default::default() + }, + ) + .unwrap(); + + tx.put::( + 5, + StoredBlockBodyIndices { + first_transition_id: 3, + tx_count: 5, + ..Default::default() + }, + ) + .unwrap(); // setup changeset that are going to be applied to history index tx.put::(trns(4), storage(STORAGE_KEY)).unwrap(); diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 8695980c8..4896bc08c 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -43,7 +43,7 @@ impl Stage for SenderRecoveryStage { } /// Retrieve the range of transactions to iterate over by querying - /// [`BlockBodies`][reth_db::tables::BlockBodies], + /// [`BlockBodyIndices`][reth_db::tables::BlockBodyIndices], /// collect transactions within that range, /// recover signer for each transaction and store entries in /// the [`TxSenders`][reth_db::tables::TxSenders] table. @@ -57,14 +57,14 @@ impl Stage for SenderRecoveryStage { let done = !capped; // Look up the start index for the transaction range - let start_tx_index = tx.get_block_body(start_block)?.start_tx_id; + let first_tx_num = tx.get_block_meta(start_block)?.first_tx_num(); // Look up the end index for transaction range (inclusive) - let end_tx_index = tx.get_block_body(end_block)?.last_tx_index(); + let last_tx_num = tx.get_block_meta(end_block)?.last_tx_num(); // No transactions to walk over - if start_tx_index > end_tx_index { - info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Target transaction already reached"); + if first_tx_num > last_tx_num { + info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Target transaction already reached"); return Ok(ExecOutput { stage_progress: end_block, done }) } @@ -74,10 +74,10 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor over the transactions let mut tx_cursor = tx.cursor_read::()?; // Walk the transactions from start to end index (inclusive) - let tx_walker = tx_cursor.walk_range(start_tx_index..=end_tx_index)?; + let tx_walker = tx_cursor.walk_range(first_tx_num..=last_tx_num)?; // Iterate over transactions in chunks - info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders"); + info!(target: "sync::stages::sender_recovery", first_tx_num, last_tx_num, "Recovering senders"); // channels used to return result of sender recovery. let mut channels = Vec::new(); @@ -136,7 +136,7 @@ impl Stage for SenderRecoveryStage { ) -> Result { info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, "Unwinding"); // Lookup latest tx id that we should unwind to - let latest_tx_id = tx.get_block_body(input.unwind_to)?.last_tx_index(); + let latest_tx_id = tx.get_block_meta(input.unwind_to)?.last_tx_num(); tx.unwind_table_by_num::(latest_tx_id)?; Ok(UnwindOutput { stage_progress: input.unwind_to }) } @@ -267,13 +267,11 @@ mod tests { /// 2. If the is no requested block entry in the bodies table, /// but [tables::TxSenders] is not empty. fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.tx.inner().get_block_body(block); + let body_result = self.tx.inner().get_block_meta(block); match body_result { Ok(body) => self .tx - .ensure_no_entry_above::(body.last_tx_index(), |key| { - key - })?, + .ensure_no_entry_above::(body.last_tx_num(), |key| key)?, Err(_) => { assert!(self.tx.table_is_empty::()?); } @@ -321,11 +319,11 @@ mod tests { return Ok(()) } - let mut body_cursor = tx.cursor_read::()?; + let mut body_cursor = tx.cursor_read::()?; body_cursor.seek_exact(start_block)?; while let Some((_, body)) = body_cursor.next()? { - for tx_id in body.tx_id_range() { + for tx_id in body.tx_num_range() { let transaction = tx .get::(tx_id)? .expect("no transaction entry"); diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 54599ff67..7594d4a4a 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -56,19 +56,17 @@ impl Stage for TransactionLookupStage { debug!(target: "sync::stages::transaction_lookup", start_block, end_block, "Commencing sync"); - let mut cursor_bodies = tx.cursor_read::()?; - let mut tx_cursor = tx.cursor_write::()?; + let mut block_meta_cursor = tx.cursor_read::()?; + let mut tx_cursor = tx.cursor_read::()?; // Walk over block bodies within a specified range. - let bodies = cursor_bodies.walk(Some(start_block))?.take_while(|entry| { - entry.as_ref().map(|(num, _)| *num <= end_block).unwrap_or_default() - }); + let bodies = block_meta_cursor.walk_range(start_block..=end_block)?; // Collect transactions for each body let mut tx_list = vec![]; - for body_entry in bodies { - let (_, body) = body_entry?; - let transactions = tx_cursor.walk(Some(body.start_tx_id))?.take(body.tx_count as usize); + for block_meta_entry in bodies { + let (_, block_meta) = block_meta_entry?; + let transactions = tx_cursor.walk_range(block_meta.tx_num_range())?; for tx_entry in transactions { let (id, transaction) = tx_entry?; @@ -81,20 +79,23 @@ impl Stage for TransactionLookupStage { let mut txhash_cursor = tx.cursor_write::()?; - // If the last inserted element in the database is smaller than the first in our set, then - // we can just append into the DB. This probably only ever happens during sync, on - // the first table insertion. - let append = tx_list + // If the last inserted element in the database is equal or bigger than the first + // in our set, then we need to insert inside the DB. If it is smaller then last + // element in the DB, we can append to the DB. + // Append probably only ever happens during sync, on the first table insertion. + let insert = tx_list .first() .zip(txhash_cursor.last()?) - .map(|((first, _), (last, _))| &last < first) + .map(|((first, _), (last, _))| first <= &last) .unwrap_or_default(); + // if txhash_cursor.last() is None we will do insert. `zip` would return none if any item is + // none. if it is some and if first is smaller than last, we will do append. for (tx_hash, id) in tx_list { - if append { - txhash_cursor.append(tx_hash, id)?; - } else { + if insert { txhash_cursor.insert(tx_hash, id)?; + } else { + txhash_cursor.append(tx_hash, id)?; } } @@ -111,7 +112,7 @@ impl Stage for TransactionLookupStage { ) -> Result { info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, "Unwinding"); // Cursors to unwind tx hash to number - let mut body_cursor = tx.cursor_read::()?; + let mut body_cursor = tx.cursor_read::()?; let mut tx_hash_number_cursor = tx.cursor_write::()?; let mut transaction_cursor = tx.cursor_read::()?; let mut rev_walker = body_cursor.walk_back(None)?; @@ -121,7 +122,7 @@ impl Stage for TransactionLookupStage { } // Delete all transactions that belong to this block - for tx_id in body.tx_id_range() { + for tx_id in body.tx_num_range() { // First delete the transaction and hash to id mapping if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? { if tx_hash_number_cursor.seek_exact(transaction.hash)?.is_some() { @@ -162,7 +163,7 @@ mod tests { // Insert blocks with a single transaction at block `stage_progress + 10` let non_empty_block_number = stage_progress + 10; - let blocks = (stage_progress..input.previous_stage_progress() + 1) + let blocks = (stage_progress..=input.previous_stage_progress()) .map(|number| { random_block(number, None, Some((number == non_empty_block_number) as u8), None) }) @@ -246,10 +247,10 @@ mod tests { /// 2. If the is no requested block entry in the bodies table, /// but [tables::TxHashNumber] is not empty. fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.tx.inner().get_block_body(number); + let body_result = self.tx.inner().get_block_meta(number); match body_result { Ok(body) => self.tx.ensure_no_entry_above_by_value::( - body.last_tx_index(), + body.last_tx_num(), |key| key, )?, Err(_) => { @@ -299,11 +300,11 @@ mod tests { return Ok(()) } - let mut body_cursor = tx.cursor_read::()?; + let mut body_cursor = tx.cursor_read::()?; body_cursor.seek_exact(start_block)?; while let Some((_, body)) = body_cursor.next()? { - for tx_id in body.tx_id_range() { + for tx_id in body.tx_num_range() { let transaction = tx .get::(tx_id)? .expect("no transaction entry"); diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 8b36d6525..310190c28 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -6,7 +6,7 @@ use reth_db::{ tx::Tx, Env, EnvKind, WriteMap, RW, }, - models::{AccountBeforeTx, StoredBlockBody}, + models::{AccountBeforeTx, StoredBlockBodyIndices}, table::Table, tables, transaction::{DbTx, DbTxMut}, @@ -230,25 +230,25 @@ impl TestTransaction { I: Iterator, { self.commit(|tx| { - let mut current_tx_id = tx_offset.unwrap_or_default(); + let mut next_tx_num = tx_offset.unwrap_or_default(); blocks.into_iter().try_for_each(|block| { Self::insert_header(tx, &block.header)?; // Insert into body tables. - tx.put::( + tx.put::( block.number, - StoredBlockBody { - start_tx_id: current_tx_id, + StoredBlockBodyIndices { + first_tx_num: next_tx_num, + first_transition_id: next_tx_num, tx_count: block.body.len() as u64, + has_block_change: false, }, )?; block.body.iter().try_for_each(|body_tx| { - tx.put::(current_tx_id, current_tx_id)?; - tx.put::(current_tx_id, body_tx.clone())?; - current_tx_id += 1; + tx.put::(next_tx_num, body_tx.clone())?; + next_tx_num += 1; Ok(()) - })?; - tx.put::(block.number, current_tx_id) + }) }) }) } diff --git a/crates/storage/codecs/derive/src/compact/mod.rs b/crates/storage/codecs/derive/src/compact/mod.rs index 3af29b4b8..bad67a7d6 100644 --- a/crates/storage/codecs/derive/src/compact/mod.rs +++ b/crates/storage/codecs/derive/src/compact/mod.rs @@ -162,7 +162,7 @@ pub fn get_bit_size(ftype: &str) -> u8 { match ftype { "bool" | "Option" => 1, "TxType" => 2, - "u64" | "BlockNumber" | "TxNumber" | "ChainId" => 4, + "u64" | "BlockNumber" | "TxNumber" | "ChainId" | "TransitionId" | "NumTransactions" => 4, "u128" => 5, "U256" | "TxHash" => 6, _ => 0, diff --git a/crates/storage/db/benches/criterion.rs b/crates/storage/db/benches/criterion.rs index 2034f7df4..378091708 100644 --- a/crates/storage/db/benches/criterion.rs +++ b/crates/storage/db/benches/criterion.rs @@ -26,11 +26,9 @@ pub fn db(c: &mut Criterion) { measure_table_db::(&mut group); measure_table_db::(&mut group); measure_table_db::(&mut group); - measure_table_db::(&mut group); + measure_table_db::(&mut group); measure_table_db::(&mut group); measure_table_db::(&mut group); - measure_table_db::(&mut group); - measure_table_db::(&mut group); measure_table_db::(&mut group); measure_dupsort_db::(&mut group); measure_table_db::(&mut group); @@ -45,11 +43,9 @@ pub fn serialization(c: &mut Criterion) { measure_table_serialization::(&mut group); measure_table_serialization::(&mut group); measure_table_serialization::(&mut group); - measure_table_serialization::(&mut group); + measure_table_serialization::(&mut group); measure_table_serialization::(&mut group); measure_table_serialization::(&mut group); - measure_table_serialization::(&mut group); - measure_table_serialization::(&mut group); measure_table_serialization::(&mut group); measure_table_serialization::(&mut group); measure_table_serialization::(&mut group); diff --git a/crates/storage/db/benches/iai.rs b/crates/storage/db/benches/iai.rs index 77bc67e73..a5023b166 100644 --- a/crates/storage/db/benches/iai.rs +++ b/crates/storage/db/benches/iai.rs @@ -68,11 +68,9 @@ impl_iai!( HeaderTD, HeaderNumbers, Headers, - BlockBodies, + BlockBodyIndices, BlockOmmers, TxHashNumber, - BlockTransitionIndex, - TxTransitionIndex, Transactions, PlainStorageState, PlainAccountState diff --git a/crates/storage/db/src/tables/codecs/compact.rs b/crates/storage/db/src/tables/codecs/compact.rs index e826971f7..25e5d5939 100644 --- a/crates/storage/db/src/tables/codecs/compact.rs +++ b/crates/storage/db/src/tables/codecs/compact.rs @@ -41,7 +41,7 @@ impl_compression_for_compact!( TxType, StorageEntry, StorageTrieEntry, - StoredBlockBody, + StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals, Bytecode, diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 0733050c8..3da4d6b76 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -25,7 +25,7 @@ use crate::{ accounts::{AccountBeforeTx, TransitionIdAddress}, blocks::{HeaderHash, StoredBlockOmmers}, storage_sharded_key::StorageShardedKey, - ShardedKey, StoredBlockBody, StoredBlockWithdrawals, + ShardedKey, StoredBlockBodyIndices, StoredBlockWithdrawals, }, }, }; @@ -44,12 +44,12 @@ pub enum TableType { } /// Default tables that should be present inside database. -pub const TABLES: [(TableType, &str); 27] = [ +pub const TABLES: [(TableType, &str); 25] = [ (TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderNumbers::const_name()), (TableType::Table, Headers::const_name()), - (TableType::Table, BlockBodies::const_name()), + (TableType::Table, BlockBodyIndices::const_name()), (TableType::Table, BlockOmmers::const_name()), (TableType::Table, BlockWithdrawals::const_name()), (TableType::Table, TransactionBlock::const_name()), @@ -59,8 +59,6 @@ pub const TABLES: [(TableType, &str); 27] = [ (TableType::Table, PlainAccountState::const_name()), (TableType::DupSort, PlainStorageState::const_name()), (TableType::Table, Bytecodes::const_name()), - (TableType::Table, BlockTransitionIndex::const_name()), - (TableType::Table, TxTransitionIndex::const_name()), (TableType::Table, AccountHistory::const_name()), (TableType::Table, StorageHistory::const_name()), (TableType::DupSort, AccountChangeSet::const_name()), @@ -146,8 +144,11 @@ table!( ); table!( - /// Stores block bodies. - ( BlockBodies ) BlockNumber | StoredBlockBody + /// Stores block indices that contains indexes of transaction and transitions, + /// number of transactions and if block has a block change (block reward or withdrawals). + /// + /// More information about stored indices can be found in the [`StoredBlockBodyIndices`] struct. + ( BlockBodyIndices ) BlockNumber | StoredBlockBodyIndices ); table!( @@ -190,20 +191,6 @@ table!( ( Bytecodes ) H256 | Bytecode ); -table!( - /// Stores the mapping of block number to state transition id. - /// The block transition marks the final state at the end of the block. - /// Increment the transition if the block contains an addition block reward. - /// If the block does not have a reward and transaction, the transition will be the same as the - /// transition at the last transaction of this block. - ( BlockTransitionIndex ) BlockNumber | TransitionId -); - -table!( - /// Stores the mapping of transaction number to state transition id. - ( TxTransitionIndex ) TxNumber | TransitionId -); - table!( /// Stores the current state of an [`Account`]. ( PlainAccountState ) Address | Account diff --git a/crates/storage/db/src/tables/models/blocks.rs b/crates/storage/db/src/tables/models/blocks.rs index 988f6244c..0ed27bc9a 100644 --- a/crates/storage/db/src/tables/models/blocks.rs +++ b/crates/storage/db/src/tables/models/blocks.rs @@ -1,40 +1,89 @@ //! Block related models and types. use reth_codecs::{main_codec, Compact}; -use reth_primitives::{Header, TxNumber, Withdrawal, H256}; +use reth_primitives::{Header, TransitionId, TxNumber, Withdrawal, H256}; use std::ops::Range; + /// Total number of transactions. pub type NumTransactions = u64; -/// The storage representation of a block. +/// The storage of the block body indices /// /// It has the pointer to the transaction Number of the first /// transaction in the block and the total number of transactions #[derive(Debug, Default, Eq, PartialEq, Clone)] #[main_codec] -pub struct StoredBlockBody { - /// The id of the first transaction in this block - pub start_tx_id: TxNumber, +pub struct StoredBlockBodyIndices { + /// The number of the first transaction in this block + /// + /// Note: If the block is empty, this is the number of the first transaction + /// in the next non-empty block. + pub first_tx_num: TxNumber, + /// The id of the first transition in this block. + /// + /// NOTE: If the block is empty, this is the id of the first transition + /// in the next non-empty block. + pub first_transition_id: TransitionId, /// The total number of transactions in the block + /// + /// NOTE: Number of transitions is equal to number of transactions with + /// additional transition for block change if block has block reward or withdrawal. pub tx_count: NumTransactions, + /// Flags if there is additional transition changeset of the withdrawal or block reward. + pub has_block_change: bool, } -impl StoredBlockBody { - /// Return the range of transaction ids for this body - pub fn tx_id_range(&self) -> Range { - self.start_tx_id..self.start_tx_id + self.tx_count +impl StoredBlockBodyIndices { + /// Return the range of transaction ids for this block. + pub fn tx_num_range(&self) -> Range { + self.first_tx_num..self.first_tx_num + self.tx_count + } + + /// Return the range of transition ids for this block. + pub fn transition_range(&self) -> Range { + self.transition_at_block()..self.transition_after_block() + } + + /// Return transition id of the state after block executed. + /// This transitions is used with the history index to represent the state after this + /// block execution. + /// + /// Because we are storing old values of the changeset in the history index, we need + /// transition of one after, to fetch correct values of the past state + /// + /// NOTE: This is the same as the first transition id of the next block. + pub fn transition_after_block(&self) -> TransitionId { + self.first_transition_id + self.tx_count + (self.has_block_change as u64) + } + + /// Return transition id of the state at the block execution. + /// This transitions is used with the history index to represent the state + /// before the block execution. + /// + /// Because we are storing old values of the changeset in the history index, we need + /// transition of one after, to fetch correct values of the past state + /// + /// NOTE: If block does not have transitions (empty block) then this is the same + /// as the first transition id of the next block. + pub fn transition_at_block(&self) -> TransitionId { + self.first_transition_id } /// Return the index of last transaction in this block unless the block /// is empty in which case it refers to the last transaction in a previous /// non-empty block - pub fn last_tx_index(&self) -> TxNumber { - self.start_tx_id.saturating_add(self.tx_count).saturating_sub(1) + pub fn last_tx_num(&self) -> TxNumber { + self.first_tx_num.saturating_add(self.tx_count).saturating_sub(1) } /// First transaction index. - pub fn first_tx_index(&self) -> TxNumber { - self.start_tx_id + pub fn first_tx_num(&self) -> TxNumber { + self.first_tx_num + } + + /// Return the index of the next transaction after this block. + pub fn next_tx_num(&self) -> TxNumber { + self.first_tx_num + self.tx_count } /// Return a flag whether the block is empty @@ -43,9 +92,17 @@ impl StoredBlockBody { } /// Return number of transaction inside block + /// + /// NOTE: This is not the same as the number of transitions. pub fn tx_count(&self) -> NumTransactions { self.tx_count } + + /// Return flag signifying whether the block has additional + /// transition changeset (withdrawal or uncle/block rewards). + pub fn has_block_change(&self) -> bool { + self.has_block_change + } } /// The storage representation of a block ommers. @@ -72,9 +129,8 @@ pub type HeaderHash = H256; #[cfg(test)] mod test { - use crate::table::{Compress, Decompress}; - use super::*; + use crate::table::{Compress, Decompress}; #[test] fn test_ommer() { @@ -85,4 +141,33 @@ mod test { ommer.clone() == StoredBlockOmmers::decompress::>(ommer.compress()).unwrap() ); } + + #[test] + fn block_meta_indices() { + let first_tx_num = 10; + let first_transition_id = 14; + let tx_count = 6; + let has_block_change = true; + let mut block_meta = StoredBlockBodyIndices { + first_tx_num, + first_transition_id, + tx_count, + has_block_change, + }; + + assert_eq!(block_meta.first_tx_num(), first_tx_num); + assert_eq!(block_meta.last_tx_num(), first_tx_num + tx_count - 1); + assert_eq!(block_meta.next_tx_num(), first_tx_num + tx_count); + assert_eq!(block_meta.tx_count(), tx_count); + assert!(block_meta.has_block_change()); + assert_eq!(block_meta.transition_at_block(), first_transition_id); + assert_eq!(block_meta.transition_after_block(), first_transition_id + tx_count + 1); + assert_eq!(block_meta.tx_num_range(), first_tx_num..first_tx_num + tx_count); + assert_eq!( + block_meta.transition_range(), + first_transition_id..first_transition_id + tx_count + 1 + ); + block_meta.has_block_change = false; + assert_eq!(block_meta.transition_after_block(), first_transition_id + tx_count); + } } diff --git a/crates/storage/libmdbx-rs/tests/transaction.rs b/crates/storage/libmdbx-rs/tests/transaction.rs index 53f1c54ef..43c0b156d 100644 --- a/crates/storage/libmdbx-rs/tests/transaction.rs +++ b/crates/storage/libmdbx-rs/tests/transaction.rs @@ -233,12 +233,11 @@ fn test_concurrent_readers_single_writer() { let txn = env.begin_rw_txn().unwrap(); let db = txn.open_db(None).unwrap(); - println!("wait2"); + barrier.wait(); txn.put(&db, key, val, WriteFlags::empty()).unwrap(); txn.commit().unwrap(); - println!("wait1"); barrier.wait(); assert!(threads.into_iter().all(|b| b.join().unwrap())) diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index efc4bc0f2..ec115a26a 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -132,8 +132,9 @@ impl BlockProvider for ShareableDatabase { if let Some(header) = self.header_by_number(number)? { let id = BlockId::Number(number.into()); let tx = self.db.tx()?; - let transactions = - self.transactions_by_block(id)?.ok_or(ProviderError::BlockBody { number })?; + let transactions = self + .transactions_by_block(id)? + .ok_or(ProviderError::BlockBodyIndices { number })?; let ommers = tx.get::(header.number)?.map(|o| o.ommers); let withdrawals = self.withdrawals_by_block(id, header.timestamp)?; @@ -200,13 +201,13 @@ impl TransactionsProvider for ShareableDatabase { tx.get::(block_number)? { if let Some(block_body) = - tx.get::(block_number)? + tx.get::(block_number)? { // the index of the tx in the block is the offset: // len([start..tx_id]) // SAFETY: `transaction_id` is always `>=` the block's first // index - let index = transaction_id - block_body.first_tx_index(); + let index = transaction_id - block_body.first_tx_num(); let meta = TransactionMeta { tx_hash, @@ -239,8 +240,8 @@ impl TransactionsProvider for ShareableDatabase { fn transactions_by_block(&self, id: BlockId) -> Result>> { if let Some(number) = self.block_number_for_id(id)? { let tx = self.db.tx()?; - if let Some(body) = tx.get::(number)? { - let tx_range = body.tx_id_range(); + if let Some(body) = tx.get::(number)? { + let tx_range = body.tx_num_range(); return if tx_range.is_empty() { Ok(Some(Vec::new())) } else { @@ -262,17 +263,17 @@ impl TransactionsProvider for ShareableDatabase { ) -> Result>> { let tx = self.db.tx()?; let mut results = Vec::default(); - let mut body_cursor = tx.cursor_read::()?; + let mut body_cursor = tx.cursor_read::()?; let mut tx_cursor = tx.cursor_read::()?; for entry in body_cursor.walk_range(range)? { let (_, body) = entry?; - let tx_range = body.tx_id_range(); - if body.tx_id_range().is_empty() { + let tx_num_range = body.tx_num_range(); + if tx_num_range.is_empty() { results.push(Vec::default()); } else { results.push( tx_cursor - .walk_range(tx_range)? + .walk_range(tx_num_range)? .map(|result| result.map(|(_, tx)| tx)) .collect::, _>>()?, ); @@ -302,8 +303,8 @@ impl ReceiptProvider for ShareableDatabase { fn receipts_by_block(&self, block: BlockId) -> Result>> { if let Some(number) = self.block_number_for_id(block)? { let tx = self.db.tx()?; - if let Some(body) = tx.get::(number)? { - let tx_range = body.tx_id_range(); + if let Some(body) = tx.get::(number)? { + let tx_range = body.tx_num_range(); return if tx_range.is_empty() { Ok(Some(Vec::new())) } else { @@ -419,8 +420,9 @@ impl StateProviderFactory for ShareableDatabase { // get transition id let transition = tx - .get::(block_number)? - .ok_or(ProviderError::BlockTransition { block_number })?; + .get::(block_number)? + .ok_or(ProviderError::BlockTransition { block_number })? + .transition_after_block(); Ok(Box::new(HistoricalStateProvider::new(tx, transition))) } @@ -434,8 +436,9 @@ impl StateProviderFactory for ShareableDatabase { // get transition id let transition = tx - .get::(block_number)? - .ok_or(ProviderError::BlockTransition { block_number })?; + .get::(block_number)? + .ok_or(ProviderError::BlockTransition { block_number })? + .transition_after_block(); Ok(Box::new(HistoricalStateProvider::new(tx, transition))) } diff --git a/crates/storage/provider/src/test_utils/blocks.rs b/crates/storage/provider/src/test_utils/blocks.rs index 75b60b9ab..de1af0f66 100644 --- a/crates/storage/provider/src/test_utils/blocks.rs +++ b/crates/storage/provider/src/test_utils/blocks.rs @@ -1,7 +1,7 @@ //! Dummy blocks and data for tests use crate::{post_state::PostState, Transaction}; -use reth_db::{database::Database, models::StoredBlockBody, tables}; +use reth_db::{database::Database, models::StoredBlockBodyIndices, tables}; use reth_primitives::{ hex_literal::hex, proofs::EMPTY_ROOT, Account, Header, SealedBlock, SealedBlockWithSenders, Withdrawal, H160, H256, U256, @@ -19,7 +19,10 @@ pub fn assert_genesis_block(tx: &Transaction<'_, DB>, g: SealedBlo assert_eq!(tx.table::().unwrap(), vec![(h, n)]); assert_eq!(tx.table::().unwrap(), vec![(n, h)]); assert_eq!(tx.table::().unwrap(), vec![(n, g.difficulty.into())]); - assert_eq!(tx.table::().unwrap(), vec![(0, StoredBlockBody::default())]); + assert_eq!( + tx.table::().unwrap(), + vec![(0, StoredBlockBodyIndices::default())] + ); assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); @@ -32,8 +35,6 @@ pub fn assert_genesis_block(tx: &Transaction<'_, DB>, g: SealedBlo assert_eq!(tx.table::().unwrap(), vec![]); // TODO check after this gets done: https://github.com/paradigmxyz/reth/issues/1588 // Bytecodes are not reverted assert_eq!(tx.table::().unwrap(), vec![]); - assert_eq!(tx.table::().unwrap(), vec![(n, 0)]); - assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); assert_eq!(tx.table::().unwrap(), vec![]); diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 91a39bb01..75c22cb17 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -11,7 +11,7 @@ use reth_db::{ models::{ sharded_key, storage_sharded_key::{self, StorageShardedKey}, - AccountBeforeTx, ShardedKey, StoredBlockBody, TransitionIdAddress, + AccountBeforeTx, ShardedKey, StoredBlockBodyIndices, TransitionIdAddress, }, table::Table, tables, @@ -140,17 +140,22 @@ where } /// Query the block body by number. - pub fn get_block_body(&self, number: BlockNumber) -> Result { - let body = - self.get::(number)?.ok_or(ProviderError::BlockBody { number })?; + pub fn get_block_meta( + &self, + number: BlockNumber, + ) -> Result { + let body = self + .get::(number)? + .ok_or(ProviderError::BlockBodyIndices { number })?; Ok(body) } /// Query the last transition of the block by [BlockNumber] key pub fn get_block_transition(&self, key: BlockNumber) -> Result { let last_transition_id = self - .get::(key)? - .ok_or(ProviderError::BlockTransition { block_number: key })?; + .get::(key)? + .ok_or(ProviderError::BlockTransition { block_number: key })? + .transition_after_block(); Ok(last_transition_id) } @@ -165,11 +170,8 @@ where } let prev_number = block - 1; - let prev_body = self.get_block_body(prev_number)?; - let last_transition = self - .get::(prev_number)? - .ok_or(ProviderError::BlockTransition { block_number: prev_number })?; - Ok((prev_body.start_tx_id + prev_body.tx_count, last_transition)) + let prev_body = self.get_block_meta(prev_number)?; + Ok((prev_body.first_tx_num + prev_body.tx_count, prev_body.transition_after_block())) } /// Query the block header by number @@ -567,9 +569,11 @@ where // Header, Body, SenderRecovery, TD, TxLookup stages let (block, senders) = block.into_components(); - let (from, to) = + let block_meta = insert_canonical_block(self.deref_mut(), block, Some(senders), false).unwrap(); + let from = block_meta.transition_at_block(); + let to = block_meta.transition_after_block(); // account history stage { let indices = self.get_account_transition_ids_from_changeset(from, to)?; @@ -664,16 +668,16 @@ where &self, range: impl RangeBounds + Clone, ) -> Result)>, TransactionError> { - // Get the transaction ID ranges for the blocks - let block_bodies = self.get_or_take::(range)?; + // Raad range of block bodies to get all transactions id's of this range. + let block_bodies = self.get_or_take::(range)?; + if block_bodies.is_empty() { return Ok(Vec::new()) } // Compute the first and last tx ID in the range - let first_transaction = - block_bodies.first().expect("If we have headers").1.first_tx_index(); - let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_index(); + let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num(); + let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num(); // If this is the case then all of the blocks in the range are empty if last_transaction < first_transaction { @@ -694,10 +698,6 @@ where tx_hash_cursor.delete_current()?; } } - // Remove TxTransitionId - self.get_or_take::( - first_transaction..=last_transaction, - )?; // Remove TransactionBlock index if there are transaction present if !transactions.is_empty() { @@ -712,7 +712,7 @@ where let mut transactions = transactions.into_iter(); for (block_number, block_body) in block_bodies { let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize); - for _ in block_body.tx_id_range() { + for _ in block_body.tx_num_range() { let tx = transactions.next(); let sender = senders.next(); @@ -835,8 +835,8 @@ where /// Traverse over changesets and plain state and recreate the [`PostState`]s for the given range /// of blocks. /// - /// 1. Iterate over the [BlockTransitionIndex][tables::BlockTransitionIndex] table to get all - /// the transitions + /// 1. Iterate over the [BlockBodyIndices][tables::BlockBodyIndices] table to get all + /// the transition indices. /// 2. Iterate over the [StorageChangeSet][tables::StorageChangeSet] table /// and the [AccountChangeSet][tables::AccountChangeSet] tables in reverse order to reconstruct /// the changesets. @@ -859,29 +859,35 @@ where &self, range: impl RangeBounds + Clone, ) -> Result, TransactionError> { + // We are not removing block meta as it is used to get block transitions. let block_transition = - self.get_or_take::(range.clone())?; + self.get_or_take::(range.clone())?; if block_transition.is_empty() { return Ok(Vec::new()) } - // get block transitions - let first_block_number = - block_transition.first().expect("Check for empty is already done").0; // get block transition of parent block. - let from = self.get_block_transition(first_block_number.saturating_sub(1))?; - let to = block_transition.last().expect("Check for empty is already done").1; + let from = block_transition + .first() + .expect("Check for empty is already done") + .1 + .transition_at_block(); + let to = block_transition + .last() + .expect("Check for empty is already done") + .1 + .transition_after_block(); // NOTE: Just get block bodies dont remove them // it is connection point for bodies getter and execution result getter. - let block_bodies = self.get_or_take::(range)?; + let block_bodies = self.get_or_take::(range)?; // get transaction receipts let from_transaction_num = - block_bodies.first().expect("already checked if there are blocks").1.first_tx_index(); + block_bodies.first().expect("already checked if there are blocks").1.first_tx_num(); let to_transaction_num = - block_bodies.last().expect("already checked if there are blocks").1.last_tx_index(); + block_bodies.last().expect("already checked if there are blocks").1.last_tx_num(); let receipts = self.get_or_take::(from_transaction_num..=to_transaction_num)?; @@ -1031,7 +1037,7 @@ where // loop break if we are at the end of the blocks. for (_, block_body) in block_bodies.into_iter() { let mut block_post_state = PostState::new(); - for tx_num in block_body.tx_id_range() { + for tx_num in block_body.tx_num_range() { if let Some(changes) = all_changesets.remove(&next_transition_id) { for mut change in changes.into_iter() { change @@ -1048,10 +1054,10 @@ where next_transition_id += 1; } - let Some((_,block_transition)) = block_transition_iter.next() else { break}; + let Some((_,block_meta)) = block_transition_iter.next() else { break}; // if block transition points to 1+next transition id it means that there is block // changeset. - if block_transition == next_transition_id + 1 { + if block_meta.has_block_change() { if let Some(changes) = all_changesets.remove(&next_transition_id) { for mut change in changes.into_iter() { change @@ -1139,7 +1145,7 @@ where // that is why it is deleted afterwards. if TAKE { // rm block bodies - self.get_or_take::(range)?; + self.get_or_take::(range)?; // Update pipeline progress if let Some(fork_number) = unwind_to { diff --git a/crates/storage/provider/src/utils.rs b/crates/storage/provider/src/utils.rs index 169fa401e..3e154c60e 100644 --- a/crates/storage/provider/src/utils.rs +++ b/crates/storage/provider/src/utils.rs @@ -1,28 +1,29 @@ use reth_db::{ - models::{StoredBlockBody, StoredBlockOmmers, StoredBlockWithdrawals}, + models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals}, tables, transaction::{DbTx, DbTxMut}, }; use reth_interfaces::{provider::ProviderError, Result}; -use reth_primitives::{Address, SealedBlock, TransitionId}; +use reth_primitives::{Address, SealedBlock}; /// Insert block data into corresponding tables. Used mainly for testing & internal tooling. /// /// -/// Check parent dependency in [tables::HeaderNumbers] and in [tables::BlockBodies] tables. +/// Check parent dependency in [tables::HeaderNumbers] and in [tables::BlockBodyIndices] tables. /// Inserts header data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers]. /// and transactions data to [tables::TxSenders], [tables::Transactions], [tables::TxHashNumber]. -/// and transition indexes to [tables::BlockTransitionIndex] and [tables::TxTransitionIndex]. -/// And block data [tables::BlockBodies], [tables::BlockBodies] and [tables::BlockWithdrawals]. +/// and transition/transaction meta data to [tables::BlockBodyIndices] +/// and block data to [tables::BlockOmmers] and [tables::BlockWithdrawals]. /// -/// Return [TransitionId] `(from,to)` +/// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and +/// transition in the block. pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( tx: &TX, block: SealedBlock, senders: Option>, has_block_reward: bool, parent_tx_num_transition_id: Option<(u64, u64)>, -) -> Result<(TransitionId, TransitionId)> { +) -> Result { let block_number = block.number; tx.put::(block.number, block.hash())?; // Put header with canonical hashes. @@ -48,35 +49,22 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( )?; } - let (mut current_tx_id, mut transition_id) = - if let Some(parent_tx_num_transition_id) = parent_tx_num_transition_id { - parent_tx_num_transition_id - } else if block.number == 0 { - (0, 0) - } else { - let prev_block_num = block.number - 1; - let prev_body = tx - .get::(prev_block_num)? - .ok_or(ProviderError::BlockBody { number: prev_block_num })?; - let last_transition_id = tx - .get::(prev_block_num)? - .ok_or(ProviderError::BlockTransition { block_number: prev_block_num })?; - (prev_body.start_tx_id + prev_body.tx_count, last_transition_id) - }; - let from_transition = transition_id; - // insert body data - tx.put::( - block.number, - StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64 }, - )?; - - if !block.body.is_empty() { - // -1 is here as current_tx_id points to the next transaction. - tx.put::( - current_tx_id + block.body.len() as u64 - 1, - block.number, - )?; - } + let parent_block_meta = if let Some(parent_tx_num_transition_id) = parent_tx_num_transition_id { + StoredBlockBodyIndices { + first_transition_id: parent_tx_num_transition_id.1, + first_tx_num: parent_tx_num_transition_id.0, + tx_count: 0, + has_block_change: false, + } + } else if block.number == 0 { + StoredBlockBodyIndices::default() + } else { + let prev_block_num = block.number - 1; + tx.get::(prev_block_num)? + .ok_or(ProviderError::BlockBodyIndices { number: prev_block_num })? + }; + let tx_count = block.body.len() as u64; + let mut next_tx_num = parent_block_meta.next_tx_num(); let senders_len = senders.as_ref().map(|s| s.len()); let tx_iter = if Some(block.body.len()) == senders_len { @@ -94,12 +82,10 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( for (transaction, sender) in tx_iter { let hash = transaction.hash(); - tx.put::(current_tx_id, sender)?; - tx.put::(current_tx_id, transaction)?; - tx.put::(current_tx_id, transition_id)?; - tx.put::(hash, current_tx_id)?; - transition_id += 1; - current_tx_id += 1; + tx.put::(next_tx_num, sender)?; + tx.put::(next_tx_num, transaction)?; + tx.put::(hash, next_tx_num)?; + next_tx_num += 1; } let mut has_withdrawals = false; @@ -113,13 +99,21 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( } } - if has_block_reward || has_withdrawals { - transition_id += 1; - } - tx.put::(block_number, transition_id)?; + let has_block_change = has_block_reward || has_withdrawals; - let to_transition = transition_id; - Ok((from_transition, to_transition)) + let block_meta = StoredBlockBodyIndices { + first_transition_id: parent_block_meta.transition_after_block(), + first_tx_num: parent_block_meta.next_tx_num(), + tx_count, + has_block_change, + }; + tx.put::(block_number, block_meta.clone())?; + + if !block_meta.is_empty() { + tx.put::(block_meta.last_tx_num(), block_number)?; + } + + Ok(block_meta) } /// Inserts canonical block in blockchain. Parent tx num and transition id is taken from @@ -129,6 +123,6 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( block: SealedBlock, senders: Option>, has_block_reward: bool, -) -> Result<(TransitionId, TransitionId)> { +) -> Result { insert_block(tx, block, senders, has_block_reward, None) }