From 0d9e1f4997f927c2657224defc12062aa838b960 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Sat, 17 Jun 2023 01:58:16 +0100 Subject: [PATCH] chore: replaces `tx.get::` with provider methods (#3189) --- bin/reth/src/debug_cmd/execution.rs | 8 ++- bin/reth/src/debug_cmd/merkle.rs | 2 +- bin/reth/src/node/mod.rs | 70 +++++++++---------- bin/reth/src/stage/run.rs | 5 +- .../stages/benches/setup/account_hashing.rs | 2 +- crates/stages/benches/setup/mod.rs | 4 +- crates/stages/src/pipeline/mod.rs | 16 +++-- crates/stages/src/stage.rs | 7 +- crates/stages/src/stages/execution.rs | 44 ++++-------- crates/stages/src/stages/hashing_account.rs | 2 +- crates/stages/src/stages/headers.rs | 32 ++++----- crates/stages/src/stages/sender_recovery.rs | 21 +++--- crates/stages/src/stages/total_difficulty.rs | 39 +++++------ crates/stages/src/stages/tx_lookup.rs | 24 +++---- crates/stages/src/test_utils/runner.rs | 2 + crates/stages/src/test_utils/test_db.rs | 17 +++-- crates/storage/provider/src/post_state/mod.rs | 19 ++--- .../provider/src/providers/database/mod.rs | 18 ++--- .../src/providers/database/provider.rs | 20 ++---- crates/storage/provider/src/providers/mod.rs | 4 ++ .../storage/provider/src/test_utils/mock.rs | 4 ++ .../storage/provider/src/test_utils/noop.rs | 4 ++ .../provider/src/traits/transactions.rs | 5 ++ crates/storage/provider/src/transaction.rs | 3 + 24 files changed, 182 insertions(+), 190 deletions(-) diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 82e62f925..3aeab6911 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -26,7 +26,7 @@ use reth_interfaces::{ use reth_network::NetworkHandle; use reth_network_api::NetworkInfo; use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256}; -use reth_provider::{providers::get_stage_checkpoint, ProviderFactory}; +use reth_provider::{ProviderFactory, StageCheckpointProvider}; use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_stages::{ sets::DefaultStages, @@ -242,15 +242,17 @@ impl Command { ctx.task_executor .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); + let factory = ProviderFactory::new(&db, self.chain.clone()); + let provider = factory.provider().map_err(PipelineError::Interface)?; + let latest_block_number = - get_stage_checkpoint(&db.tx()?, StageId::Finish)?.unwrap_or_default().block_number; + provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; if latest_block_number >= self.to { info!(target: "reth::cli", latest = latest_block_number, "Nothing to run"); return Ok(()) } let mut current_max_block = latest_block_number; - let factory = ProviderFactory::new(&db, self.chain.clone()); while current_max_block < self.to { let next_block = current_max_block + 1; diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 58e4a5df3..1009fe6d8 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -9,7 +9,7 @@ use reth_primitives::{ stage::{StageCheckpoint, StageId}, ChainSpec, }; -use reth_provider::ProviderFactory; +use reth_provider::{ProviderFactory, StageCheckpointProvider}; use reth_staged_sync::utils::init::init_db; use reth_stages::{ stages::{ diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 4a90aa5e9..b7b6b7215 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -23,8 +23,6 @@ use reth_config::Config; use reth_db::{ database::Database, mdbx::{Env, WriteMap}, - tables, - transaction::DbTx, }; use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_downloaders::{ @@ -41,12 +39,10 @@ use reth_interfaces::{ }; use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::NetworkInfo; -use reth_primitives::{ - stage::StageId, BlockHashOrNumber, ChainSpec, Head, Header, SealedHeader, H256, -}; +use reth_primitives::{stage::StageId, BlockHashOrNumber, ChainSpec, Head, SealedHeader, H256}; use reth_provider::{ - providers::get_stage_checkpoint, BlockProvider, CanonStateSubscriptions, HeaderProvider, - ProviderFactory, + BlockHashProvider, BlockProvider, CanonStateSubscriptions, HeaderProvider, ProviderFactory, + StageCheckpointProvider, }; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; @@ -509,30 +505,31 @@ impl Command { Ok(handle) } - fn lookup_head( - &self, - db: Arc>, - ) -> Result { - db.view(|tx| { - let head = get_stage_checkpoint(tx, StageId::Finish)?.unwrap_or_default().block_number; - let header = tx - .get::(head)? - .expect("the header for the latest block is missing, database is corrupt"); - let total_difficulty = tx.get::(head)?.expect( - "the total difficulty for the latest block is missing, database is corrupt", - ); - let hash = tx - .get::(head)? - .expect("the hash for the latest block is missing, database is corrupt"); - Ok::(Head { - number: head, - hash, - difficulty: header.difficulty, - total_difficulty: total_difficulty.into(), - timestamp: header.timestamp, - }) - })? - .map_err(Into::into) + fn lookup_head(&self, db: Arc>) -> Result { + let factory = ProviderFactory::new(db, self.chain.clone()); + let provider = factory.provider()?; + + let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; + + let header = provider + .header_by_number(head)? + .expect("the header for the latest block is missing, database is corrupt"); + + let total_difficulty = provider + .header_td_by_number(head)? + .expect("the total difficulty for the latest block is missing, database is corrupt"); + + let hash = provider + .block_hash(head)? + .expect("the hash for the latest block is missing, database is corrupt"); + + Ok(Head { + number: head, + hash, + difficulty: header.difficulty, + total_difficulty, + timestamp: header.timestamp, + }) } /// Attempt to look up the block number for the tip hash in the database. @@ -565,13 +562,10 @@ impl Command { DB: Database, Client: HeadersClient, { - let header = db.view(|tx| -> Result, reth_db::DatabaseError> { - let number = match tip { - BlockHashOrNumber::Hash(hash) => tx.get::(hash)?, - BlockHashOrNumber::Number(number) => Some(number), - }; - Ok(number.map(|number| tx.get::(number)).transpose()?.flatten()) - })??; + let factory = ProviderFactory::new(db, self.chain.clone()); + let provider = factory.provider()?; + + let header = provider.header_by_hash_or_number(tip)?; // try to look up the header in the database if let Some(header) = header { diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 592d0d66c..4af0b0017 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -12,7 +12,7 @@ use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_primitives::ChainSpec; -use reth_provider::{providers::get_stage_checkpoint, ProviderFactory}; +use reth_provider::{ProviderFactory, StageCheckpointProvider}; use reth_staged_sync::utils::init::init_db; use reth_stages::{ stages::{ @@ -215,8 +215,7 @@ impl Command { assert!(exec_stage.type_id() == unwind_stage.type_id()); } - let checkpoint = - get_stage_checkpoint(provider_rw.tx_ref(), exec_stage.id())?.unwrap_or_default(); + let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default(); let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage); diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index fa3492adf..c8210ec3b 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -63,7 +63,7 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { std::fs::create_dir_all(&path).unwrap(); println!("Account Hashing testdata not found, generating to {:?}", path.display()); let tx = TestTransaction::new(&path); - let mut provider = tx.inner(); + let mut provider = tx.inner_rw(); let _accounts = AccountHashingStage::seed(&mut provider, opts); provider.commit().expect("failed to commit"); } diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index ee0c394d4..8fca81471 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -123,7 +123,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { tx.insert_accounts_and_storages(start_state.clone()).unwrap(); // make first block after genesis have valid state root - let (root, updates) = StateRoot::new(tx.inner().tx_ref()).root_with_updates().unwrap(); + let (root, updates) = StateRoot::new(tx.inner_rw().tx_ref()).root_with_updates().unwrap(); let second_block = blocks.get_mut(1).unwrap(); let cloned_second = second_block.clone(); let mut updated_header = cloned_second.header.unseal(); @@ -144,7 +144,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { // make last block have valid state root let root = { - let tx_mut = tx.inner(); + let tx_mut = tx.inner_rw(); let root = StateRoot::new(tx_mut.tx_ref()).root().unwrap(); tx_mut.commit().unwrap(); root diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index e8ceaf874..4032b1bed 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -6,7 +6,7 @@ use reth_primitives::{ constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, listener::EventListeners, stage::StageId, BlockNumber, ChainSpec, H256, }; -use reth_provider::{providers::get_stage_checkpoint, ProviderFactory}; +use reth_provider::{ProviderFactory, StageCheckpointProvider}; use std::{pin::Pin, sync::Arc}; use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -137,12 +137,14 @@ where /// Registers progress metrics for each registered stage pub fn register_metrics(&mut self) -> Result<(), PipelineError> { - let tx = self.db.tx()?; + let factory = ProviderFactory::new(&self.db, self.chain_spec.clone()); + let provider = factory.provider()?; + for stage in &self.stages { let stage_id = stage.id(); self.metrics.stage_checkpoint( stage_id, - get_stage_checkpoint(&tx, stage_id)?.unwrap_or_default(), + provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(), None, ); } @@ -228,8 +230,14 @@ where } } + let factory = ProviderFactory::new(&self.db, self.chain_spec.clone()); + previous_stage = Some( - get_stage_checkpoint(&self.db.tx()?, stage_id)?.unwrap_or_default().block_number, + factory + .provider()? + .get_stage_checkpoint(stage_id)? + .unwrap_or_default() + .block_number, ); } diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index a7de48499..30b8f7fca 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -5,7 +5,7 @@ use reth_primitives::{ stage::{StageCheckpoint, StageId}, BlockNumber, TxNumber, }; -use reth_provider::{DatabaseProviderRW, ProviderError}; +use reth_provider::DatabaseProviderRW; use std::{ cmp::{max, min}, ops::RangeInclusive, @@ -79,10 +79,7 @@ impl ExecInput { tx_threshold: u64, ) -> Result<(RangeInclusive, RangeInclusive, bool), StageError> { let start_block = self.next_block(); - let start_block_body = provider - .tx_ref() - .get::(start_block)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?; + let start_block_body = provider.block_body_indices(start_block)?; let target_block = self.target(); diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index e57c944ae..b62031496 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -422,7 +422,9 @@ mod tests { hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode, ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, MAINNET, U256, }; - use reth_provider::{insert_canonical_block, ProviderFactory}; + use reth_provider::{ + insert_canonical_block, AccountProvider, ProviderFactory, ReceiptProvider, + }; use reth_revm::Factory; use reth_rlp::Decodable; use std::sync::Arc; @@ -624,8 +626,9 @@ mod tests { }, done: true } if processed == total && total == block.gas_used); - let mut provider = factory.provider_rw().unwrap(); - let tx = provider.tx_mut(); + + let provider = factory.provider().unwrap(); + // check post state let account1 = H160(hex!("1000000000000000000000000000000000000000")); let account1_info = @@ -645,24 +648,24 @@ mod tests { // assert accounts assert_eq!( - tx.get::(account1), + provider.basic_account(account1), Ok(Some(account1_info)), "Post changed of a account" ); assert_eq!( - tx.get::(account2), + provider.basic_account(account2), Ok(Some(account2_info)), "Post changed of a account" ); assert_eq!( - tx.get::(account3), + provider.basic_account(account3), Ok(Some(account3_info)), "Post changed of a account" ); // assert storage // Get on dupsort would return only first value. This is good enough for this test. assert_eq!( - tx.get::(account1), + provider.tx_ref().get::(account1), Ok(Some(StorageEntry { key: H256::from_low_u64_be(1), value: U256::from(2) })), "Post changed of a account" ); @@ -739,26 +742,13 @@ mod tests { } if total == block.gas_used); // assert unwind stage - let db_tx = provider.tx_ref(); - assert_eq!( - db_tx.get::(acc1), - Ok(Some(acc1_info)), - "Pre changed of a account" - ); - assert_eq!( - db_tx.get::(acc2), - Ok(Some(acc2_info)), - "Post changed of a account" - ); + assert_eq!(provider.basic_account(acc1), Ok(Some(acc1_info)), "Pre changed of a account"); + assert_eq!(provider.basic_account(acc2), Ok(Some(acc2_info)), "Post changed of a account"); let miner_acc = H160(hex!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba")); - assert_eq!( - db_tx.get::(miner_acc), - Ok(None), - "Third account should be unwound" - ); + assert_eq!(provider.basic_account(miner_acc), Ok(None), "Third account should be unwound"); - assert_eq!(db_tx.get::(0), Ok(None), "First receipt should be unwound"); + assert_eq!(provider.receipt(0), Ok(None), "First receipt should be unwound"); } #[tokio::test] @@ -830,11 +820,7 @@ mod tests { // assert unwind stage let provider = factory.provider_rw().unwrap(); - assert_eq!( - provider.tx_ref().get::(destroyed_address), - Ok(None), - "Account was destroyed" - ); + assert_eq!(provider.basic_account(destroyed_address), Ok(None), "Account was destroyed"); assert_eq!( provider.tx_ref().get::(destroyed_address), diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index ab56a3398..37f6dfc2c 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -532,7 +532,7 @@ mod tests { type Seed = Vec<(Address, Account)>; fn seed_execution(&mut self, input: ExecInput) -> Result { - let mut provider = self.tx.inner(); + let mut provider = self.tx.inner_rw(); let res = Ok(AccountHashingStage::seed( &mut provider, SeedOpts { blocks: 1..=input.target(), accounts: 0..10, txs: 0..3 }, diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 6b57dc400..7c38fecec 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -403,6 +403,7 @@ mod tests { generators::random_header_range, TestConsensus, TestHeaderDownloader, TestHeadersClient, }; use reth_primitives::U256; + use reth_provider::{BlockHashProvider, BlockNumProvider, HeaderProvider}; use std::sync::Arc; pub(crate) struct HeadersTestRunner { @@ -478,26 +479,21 @@ mod tests { let initial_checkpoint = input.checkpoint().block_number; match output { Some(output) if output.checkpoint.block_number > initial_checkpoint => { - self.tx.query(|tx| { - for block_num in - (initial_checkpoint..output.checkpoint.block_number).rev() - { - // look up the header hash - let hash = tx - .get::(block_num)? - .expect("no header hash"); + let provider = self.tx.factory.provider()?; + for block_num in (initial_checkpoint..output.checkpoint.block_number).rev() + { + // look up the header hash + let hash = provider.block_hash(block_num)?.expect("no header hash"); - // validate the header number - assert_eq!(tx.get::(hash)?, Some(block_num)); + // validate the header number + assert_eq!(provider.block_number(hash)?, Some(block_num)); - // validate the header - let header = tx.get::(block_num)?; - assert!(header.is_some()); - let header = header.unwrap().seal_slow(); - assert_eq!(header.hash(), hash); - } - Ok(()) - })?; + // validate the header + let header = provider.header_by_number(block_num)?; + assert!(header.is_some()); + let header = header.unwrap().seal_slow(); + assert_eq!(header.hash(), hash); + } } _ => self.check_no_header_entry_above(initial_checkpoint)?, }; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 9f74ebcd2..2fe297e4b 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -235,6 +235,7 @@ mod tests { use reth_primitives::{ stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256, }; + use reth_provider::TransactionsProvider; use super::*; use crate::test_utils::{ @@ -373,7 +374,7 @@ mod tests { /// 2. If the is no requested block entry in the bodies table, /// but [tables::TxSenders] is not empty. fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.tx.inner().block_body_indices(block); + let body_result = self.tx.inner_rw().block_body_indices(block); match body_result { Ok(body) => self .tx @@ -417,7 +418,8 @@ mod tests { output: Option, ) -> Result<(), TestRunnerError> { match output { - Some(output) => self.tx.query(|tx| { + Some(output) => { + let provider = self.tx.inner(); let start_block = input.next_block(); let end_block = output.checkpoint.block_number; @@ -425,23 +427,20 @@ mod tests { return Ok(()) } - let mut body_cursor = tx.cursor_read::()?; + let mut body_cursor = + provider.tx_ref().cursor_read::()?; body_cursor.seek_exact(start_block)?; while let Some((_, body)) = body_cursor.next()? { for tx_id in body.tx_num_range() { - let transaction: TransactionSigned = tx - .get::(tx_id)? - .expect("no transaction entry") - .into(); + let transaction: TransactionSigned = + provider.transaction_by_id(tx_id)?.expect("no transaction entry"); let signer = transaction.recover_signer().expect("failed to recover signer"); - assert_eq!(Some(signer), tx.get::(tx_id)?); + assert_eq!(Some(signer), provider.transaction_sender(tx_id)?) } } - - Ok(()) - })?, + } None => self.ensure_no_senders_by_block(input.checkpoint().block_number)?, }; diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index 41afa8213..7175eb15f 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -130,6 +130,7 @@ mod tests { TestConsensus, }; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedHeader}; + use reth_provider::HeaderProvider; use super::*; use crate::test_utils::{ @@ -262,27 +263,25 @@ mod tests { let initial_stage_progress = input.checkpoint().block_number; match output { Some(output) if output.checkpoint.block_number > initial_stage_progress => { - self.tx.query(|tx| { - let mut header_cursor = tx.cursor_read::()?; - let (_, mut current_header) = header_cursor - .seek_exact(initial_stage_progress)? - .expect("no initial header"); - let mut td: U256 = tx - .get::(initial_stage_progress)? - .expect("no initial td") - .into(); + let provider = self.tx.inner(); - while let Some((next_key, next_header)) = header_cursor.next()? { - assert_eq!(current_header.number + 1, next_header.number); - td += next_header.difficulty; - assert_eq!( - tx.get::(next_key)?.map(Into::into), - Some(td) - ); - current_header = next_header; - } - Ok(()) - })?; + let mut header_cursor = provider.tx_ref().cursor_read::()?; + let (_, mut current_header) = header_cursor + .seek_exact(initial_stage_progress)? + .expect("no initial header"); + let mut td: U256 = provider + .header_td_by_number(initial_stage_progress)? + .expect("no initial td"); + + while let Some((next_key, next_header)) = header_cursor.next()? { + assert_eq!(current_header.number + 1, next_header.number); + td += next_header.difficulty; + assert_eq!( + provider.header_td_by_number(next_key)?.map(Into::into), + Some(td) + ); + current_header = next_header; + } } _ => self.check_no_td_above(initial_stage_progress)?, }; diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 6b92d51c4..bf4b757cc 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -199,6 +199,7 @@ mod tests { use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; + use reth_provider::TransactionsProvider; // Implement stage test suite. stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); @@ -331,7 +332,7 @@ mod tests { /// 2. If the is no requested block entry in the bodies table, /// but [tables::TxHashNumber] is not empty. fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.tx.inner().block_body_indices(number); + let body_result = self.tx.inner_rw().block_body_indices(number); match body_result { Ok(body) => self.tx.ensure_no_entry_above_by_value::( body.last_tx_num(), @@ -376,7 +377,9 @@ mod tests { output: Option, ) -> Result<(), TestRunnerError> { match output { - Some(output) => self.tx.query(|tx| { + Some(output) => { + let provider = self.tx.inner(); + let start_block = input.next_block(); let end_block = output.checkpoint.block_number; @@ -384,23 +387,18 @@ mod tests { return Ok(()) } - let mut body_cursor = tx.cursor_read::()?; + let mut body_cursor = + provider.tx_ref().cursor_read::()?; body_cursor.seek_exact(start_block)?; while let Some((_, body)) = body_cursor.next()? { for tx_id in body.tx_num_range() { - let transaction = tx - .get::(tx_id)? - .expect("no transaction entry"); - assert_eq!( - Some(tx_id), - tx.get::(transaction.hash())?, - ); + let transaction = + provider.transaction_by_id(tx_id)?.expect("no transaction entry"); + assert_eq!(Some(tx_id), provider.transaction_id(transaction.hash())?); } } - - Ok(()) - })?, + } None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?, }; Ok(()) diff --git a/crates/stages/src/test_utils/runner.rs b/crates/stages/src/test_utils/runner.rs index ff2f0133f..01ae19d49 100644 --- a/crates/stages/src/test_utils/runner.rs +++ b/crates/stages/src/test_utils/runner.rs @@ -12,6 +12,8 @@ pub(crate) enum TestRunnerError { Database(#[from] reth_interfaces::db::DatabaseError), #[error("Internal runner error occurred.")] Internal(#[from] Box), + #[error("Internal interface error occurred.")] + Interface(#[from] reth_interfaces::Error), } /// A generic test runner for stages. diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 43763983a..ea0b6974c 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -4,7 +4,7 @@ use reth_db::{ mdbx::{ test_utils::{create_test_db, create_test_db_with_path}, tx::Tx, - Env, EnvKind, WriteMap, RW, + Env, EnvKind, WriteMap, RO, RW, }, models::{AccountBeforeTx, StoredBlockBodyIndices}, table::Table, @@ -16,7 +16,7 @@ use reth_primitives::{ keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, MAINNET, U256, }; -use reth_provider::{DatabaseProviderRW, ProviderFactory}; +use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory}; use std::{ borrow::Borrow, collections::BTreeMap, @@ -37,7 +37,7 @@ pub struct TestTransaction { /// WriteMap DB pub tx: Arc>, pub path: Option, - factory: ProviderFactory>>, + pub factory: ProviderFactory>>, } impl Default for TestTransaction { @@ -59,10 +59,15 @@ impl TestTransaction { } /// Return a database wrapped in [DatabaseProviderRW]. - pub fn inner(&self) -> DatabaseProviderRW<'_, Arc>> { + pub fn inner_rw(&self) -> DatabaseProviderRW<'_, Arc>> { self.factory.provider_rw().expect("failed to create db container") } + /// Return a database wrapped in [DatabaseProviderRO]. + pub fn inner(&self) -> DatabaseProviderRO<'_, Arc>> { + self.factory.provider().expect("failed to create db container") + } + /// Get a pointer to an internal database. pub fn inner_raw(&self) -> Arc> { self.tx.clone() @@ -73,7 +78,7 @@ impl TestTransaction { where F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>, { - let mut tx = self.inner(); + let mut tx = self.inner_rw(); f(tx.tx_mut())?; tx.commit().expect("failed to commit"); Ok(()) @@ -82,7 +87,7 @@ impl TestTransaction { /// Invoke a callback with a read transaction pub fn query(&self, f: F) -> Result where - F: FnOnce(&Tx<'_, RW, WriteMap>) -> Result, + F: FnOnce(&Tx<'_, RO, WriteMap>) -> Result, { f(self.inner().tx_ref()) } diff --git a/crates/storage/provider/src/post_state/mod.rs b/crates/storage/provider/src/post_state/mod.rs index 23106625c..f37d16128 100644 --- a/crates/storage/provider/src/post_state/mod.rs +++ b/crates/storage/provider/src/post_state/mod.rs @@ -640,12 +640,13 @@ impl PostState { #[cfg(test)] mod tests { use super::*; + use crate::{AccountProvider, ProviderFactory}; use reth_db::{ database::Database, mdbx::{test_utils, Env, EnvKind, WriteMap}, transaction::DbTx, }; - use reth_primitives::proofs::EMPTY_ROOT; + use reth_primitives::{proofs::EMPTY_ROOT, MAINNET}; use reth_trie::test_utils::state_root; use std::sync::Arc; @@ -1066,7 +1067,8 @@ mod tests { #[test] fn write_to_db_account_info() { let db: Arc> = test_utils::create_test_db(EnvKind::RW); - let tx = db.tx_mut().expect("Could not get database tx"); + let factory = ProviderFactory::new(db, MAINNET.clone()); + let provider = factory.provider_rw().unwrap(); let mut post_state = PostState::new(); @@ -1081,22 +1083,23 @@ mod tests { post_state.create_account(1, address_a, account_a); // 0x11.. is changed (balance + 1, nonce + 1) post_state.change_account(1, address_b, account_b, account_b_changed); - post_state.write_to_db(&tx).expect("Could not write post state to DB"); + post_state.write_to_db(provider.tx_ref()).expect("Could not write post state to DB"); // Check plain state assert_eq!( - tx.get::(address_a).expect("Could not read account state"), + provider.basic_account(address_a).expect("Could not read account state"), Some(account_a), "Account A state is wrong" ); assert_eq!( - tx.get::(address_b).expect("Could not read account state"), + provider.basic_account(address_b).expect("Could not read account state"), Some(account_b_changed), "Account B state is wrong" ); // Check change set - let mut changeset_cursor = tx + let mut changeset_cursor = provider + .tx_ref() .cursor_dup_read::() .expect("Could not open changeset cursor"); assert_eq!( @@ -1113,11 +1116,11 @@ mod tests { let mut post_state = PostState::new(); // 0x11.. is destroyed post_state.destroy_account(2, address_b, account_b_changed); - post_state.write_to_db(&tx).expect("Could not write second post state to DB"); + post_state.write_to_db(provider.tx_ref()).expect("Could not write second post state to DB"); // Check new plain state for account B assert_eq!( - tx.get::(address_b).expect("Could not read account state"), + provider.basic_account(address_b).expect("Could not read account state"), None, "Account B should be deleted" ); diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index c4370e791..4ba37607b 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -5,7 +5,7 @@ use crate::{ ProviderError, StageCheckpointProvider, StateProviderBox, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::{database::Database, models::StoredBlockBodyIndices, tables, transaction::DbTx}; +use reth_db::{database::Database, models::StoredBlockBodyIndices}; use reth_interfaces::Result; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -246,6 +246,10 @@ impl TransactionsProvider for ProviderFactory { fn senders_by_tx_range(&self, range: impl RangeBounds) -> Result> { self.provider()?.senders_by_tx_range(range) } + + fn transaction_sender(&self, id: TxNumber) -> Result> { + self.provider()?.transaction_sender(id) + } } impl ReceiptProvider for ProviderFactory { @@ -318,18 +322,6 @@ impl EvmEnvProvider for ProviderFactory { } } -/// Get checkpoint for the given stage. -#[inline] -pub fn get_stage_checkpoint<'a, TX>( - tx: &TX, - id: StageId, -) -> std::result::Result, reth_interfaces::db::DatabaseError> -where - TX: DbTx<'a> + Send + Sync, -{ - tx.get::(id.to_string()) -} - #[cfg(test)] mod tests { use super::ProviderFactory; diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 489bb647d..3916969d7 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -43,8 +43,6 @@ use std::{ sync::Arc, }; -use super::get_stage_checkpoint; - /// A [`DatabaseProvider`] that holds a read-only database transaction. pub type DatabaseProviderRO<'this, DB> = DatabaseProvider<'this, >::TX>; @@ -740,8 +738,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { let parent_number = range.start().saturating_sub(1); let parent_state_root = self - .tx - .get::(parent_number)? + .header_by_number(parent_number)? .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))? .state_root; @@ -749,8 +746,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { // but for sake of double verification we will check it again. if new_state_root != parent_state_root { let parent_hash = self - .tx - .get::(parent_number)? + .block_hash(parent_number)? .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?; return Err(TransactionError::UnwindStateRootMismatch { got: new_state_root, @@ -1076,14 +1072,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(()) } - /// Get the stage checkpoint. - pub fn get_stage_checkpoint( - &self, - id: StageId, - ) -> std::result::Result, DatabaseError> { - get_stage_checkpoint(&self.tx, id) - } - /// Save stage checkpoint. pub fn save_stage_checkpoint( &self, @@ -1743,6 +1731,10 @@ impl<'this, TX: DbTx<'this>> TransactionsProvider for DatabaseProvider<'this, TX .map(|entry| entry.map(|sender| sender.1)) .collect::, _>>()?) } + + fn transaction_sender(&self, id: TxNumber) -> Result> { + Ok(self.tx.get::(id)?) + } } impl<'this, TX: DbTx<'this>> ReceiptProvider for DatabaseProvider<'this, TX> { diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 9594f8173..6502cacb3 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -302,6 +302,10 @@ where fn senders_by_tx_range(&self, range: impl RangeBounds) -> Result> { self.database.provider()?.senders_by_tx_range(range) } + + fn transaction_sender(&self, id: TxNumber) -> Result> { + self.database.provider()?.transaction_sender(id) + } } impl ReceiptProvider for BlockchainProvider diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 9c029cd11..869f972c2 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -219,6 +219,10 @@ impl TransactionsProvider for MockEthProvider { ) -> Result> { unimplemented!() } + + fn transaction_sender(&self, _id: TxNumber) -> Result> { + unimplemented!() + } } impl ReceiptProvider for MockEthProvider { diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 97d6933b2..bd0ac1fea 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -159,6 +159,10 @@ impl TransactionsProvider for NoopProvider { ) -> Result> { Ok(Vec::default()) } + + fn transaction_sender(&self, _id: TxNumber) -> Result> { + Ok(None) + } } impl ReceiptProvider for NoopProvider { diff --git a/crates/storage/provider/src/traits/transactions.rs b/crates/storage/provider/src/traits/transactions.rs index ca5e15a88..e53a03cbf 100644 --- a/crates/storage/provider/src/traits/transactions.rs +++ b/crates/storage/provider/src/traits/transactions.rs @@ -51,4 +51,9 @@ pub trait TransactionsProvider: BlockNumProvider + Send + Sync { /// Get Senders from a tx range. fn senders_by_tx_range(&self, range: impl RangeBounds) -> Result>; + + /// Get transaction sender. + /// + /// Returns None if the transaction is not found. + fn transaction_sender(&self, id: TxNumber) -> Result>; } diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index b75c87dec..9614f5e1f 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -39,6 +39,9 @@ pub enum TransactionError { /// Block hash block_hash: BlockHash, }, + /// Internal interfaces error + #[error("Internal error")] + InternalError(#[from] reth_interfaces::Error), } #[cfg(test)]