chore: replaces tx.get::<Table> with provider methods (#3189)

This commit is contained in:
joshieDo
2023-06-17 01:58:16 +01:00
committed by GitHub
parent bb1ffd059e
commit 0d9e1f4997
24 changed files with 182 additions and 190 deletions

View File

@ -26,7 +26,7 @@ use reth_interfaces::{
use reth_network::NetworkHandle; use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256}; use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256};
use reth_provider::{providers::get_stage_checkpoint, ProviderFactory}; use reth_provider::{ProviderFactory, StageCheckpointProvider};
use reth_staged_sync::utils::init::{init_db, init_genesis}; use reth_staged_sync::utils::init::{init_db, init_genesis};
use reth_stages::{ use reth_stages::{
sets::DefaultStages, sets::DefaultStages,
@ -242,15 +242,17 @@ impl Command {
ctx.task_executor ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events)); .spawn_critical("events task", events::handle_events(Some(network.clone()), events));
let factory = ProviderFactory::new(&db, self.chain.clone());
let provider = factory.provider().map_err(PipelineError::Interface)?;
let latest_block_number = let latest_block_number =
get_stage_checkpoint(&db.tx()?, StageId::Finish)?.unwrap_or_default().block_number; provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number;
if latest_block_number >= self.to { if latest_block_number >= self.to {
info!(target: "reth::cli", latest = latest_block_number, "Nothing to run"); info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
return Ok(()) return Ok(())
} }
let mut current_max_block = latest_block_number; let mut current_max_block = latest_block_number;
let factory = ProviderFactory::new(&db, self.chain.clone());
while current_max_block < self.to { while current_max_block < self.to {
let next_block = current_max_block + 1; let next_block = current_max_block + 1;

View File

@ -9,7 +9,7 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
ChainSpec, ChainSpec,
}; };
use reth_provider::ProviderFactory; use reth_provider::{ProviderFactory, StageCheckpointProvider};
use reth_staged_sync::utils::init::init_db; use reth_staged_sync::utils::init::init_db;
use reth_stages::{ use reth_stages::{
stages::{ stages::{

View File

@ -23,8 +23,6 @@ use reth_config::Config;
use reth_db::{ use reth_db::{
database::Database, database::Database,
mdbx::{Env, WriteMap}, mdbx::{Env, WriteMap},
tables,
transaction::DbTx,
}; };
use reth_discv4::DEFAULT_DISCOVERY_PORT; use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_downloaders::{ use reth_downloaders::{
@ -41,12 +39,10 @@ use reth_interfaces::{
}; };
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{ use reth_primitives::{stage::StageId, BlockHashOrNumber, ChainSpec, Head, SealedHeader, H256};
stage::StageId, BlockHashOrNumber, ChainSpec, Head, Header, SealedHeader, H256,
};
use reth_provider::{ use reth_provider::{
providers::get_stage_checkpoint, BlockProvider, CanonStateSubscriptions, HeaderProvider, BlockHashProvider, BlockProvider, CanonStateSubscriptions, HeaderProvider, ProviderFactory,
ProviderFactory, StageCheckpointProvider,
}; };
use reth_revm::Factory; use reth_revm::Factory;
use reth_revm_inspectors::stack::Hook; use reth_revm_inspectors::stack::Hook;
@ -509,30 +505,31 @@ impl Command {
Ok(handle) Ok(handle)
} }
fn lookup_head( fn lookup_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::Error> {
&self, let factory = ProviderFactory::new(db, self.chain.clone());
db: Arc<Env<WriteMap>>, let provider = factory.provider()?;
) -> Result<Head, reth_interfaces::db::DatabaseError> {
db.view(|tx| { let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number;
let head = get_stage_checkpoint(tx, StageId::Finish)?.unwrap_or_default().block_number;
let header = tx let header = provider
.get::<tables::Headers>(head)? .header_by_number(head)?
.expect("the header for the latest block is missing, database is corrupt"); .expect("the header for the latest block is missing, database is corrupt");
let total_difficulty = tx.get::<tables::HeaderTD>(head)?.expect(
"the total difficulty for the latest block is missing, database is corrupt", let total_difficulty = provider
); .header_td_by_number(head)?
let hash = tx .expect("the total difficulty for the latest block is missing, database is corrupt");
.get::<tables::CanonicalHeaders>(head)?
.expect("the hash for the latest block is missing, database is corrupt"); let hash = provider
Ok::<Head, reth_interfaces::db::DatabaseError>(Head { .block_hash(head)?
number: head, .expect("the hash for the latest block is missing, database is corrupt");
hash,
difficulty: header.difficulty, Ok(Head {
total_difficulty: total_difficulty.into(), number: head,
timestamp: header.timestamp, hash,
}) difficulty: header.difficulty,
})? total_difficulty,
.map_err(Into::into) timestamp: header.timestamp,
})
} }
/// Attempt to look up the block number for the tip hash in the database. /// Attempt to look up the block number for the tip hash in the database.
@ -565,13 +562,10 @@ impl Command {
DB: Database, DB: Database,
Client: HeadersClient, Client: HeadersClient,
{ {
let header = db.view(|tx| -> Result<Option<Header>, reth_db::DatabaseError> { let factory = ProviderFactory::new(db, self.chain.clone());
let number = match tip { let provider = factory.provider()?;
BlockHashOrNumber::Hash(hash) => tx.get::<tables::HeaderNumbers>(hash)?,
BlockHashOrNumber::Number(number) => Some(number), let header = provider.header_by_hash_or_number(tip)?;
};
Ok(number.map(|number| tx.get::<tables::Headers>(number)).transpose()?.flatten())
})??;
// try to look up the header in the database // try to look up the header in the database
if let Some(header) = header { if let Some(header) = header {

View File

@ -12,7 +12,7 @@ use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config; use reth_config::Config;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_primitives::ChainSpec; use reth_primitives::ChainSpec;
use reth_provider::{providers::get_stage_checkpoint, ProviderFactory}; use reth_provider::{ProviderFactory, StageCheckpointProvider};
use reth_staged_sync::utils::init::init_db; use reth_staged_sync::utils::init::init_db;
use reth_stages::{ use reth_stages::{
stages::{ stages::{
@ -215,8 +215,7 @@ impl Command {
assert!(exec_stage.type_id() == unwind_stage.type_id()); assert!(exec_stage.type_id() == unwind_stage.type_id());
} }
let checkpoint = let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
get_stage_checkpoint(provider_rw.tx_ref(), exec_stage.id())?.unwrap_or_default();
let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage); let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);

View File

@ -63,7 +63,7 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
std::fs::create_dir_all(&path).unwrap(); std::fs::create_dir_all(&path).unwrap();
println!("Account Hashing testdata not found, generating to {:?}", path.display()); println!("Account Hashing testdata not found, generating to {:?}", path.display());
let tx = TestTransaction::new(&path); let tx = TestTransaction::new(&path);
let mut provider = tx.inner(); let mut provider = tx.inner_rw();
let _accounts = AccountHashingStage::seed(&mut provider, opts); let _accounts = AccountHashingStage::seed(&mut provider, opts);
provider.commit().expect("failed to commit"); provider.commit().expect("failed to commit");
} }

View File

@ -123,7 +123,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
tx.insert_accounts_and_storages(start_state.clone()).unwrap(); tx.insert_accounts_and_storages(start_state.clone()).unwrap();
// make first block after genesis have valid state root // make first block after genesis have valid state root
let (root, updates) = StateRoot::new(tx.inner().tx_ref()).root_with_updates().unwrap(); let (root, updates) = StateRoot::new(tx.inner_rw().tx_ref()).root_with_updates().unwrap();
let second_block = blocks.get_mut(1).unwrap(); let second_block = blocks.get_mut(1).unwrap();
let cloned_second = second_block.clone(); let cloned_second = second_block.clone();
let mut updated_header = cloned_second.header.unseal(); let mut updated_header = cloned_second.header.unseal();
@ -144,7 +144,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
// make last block have valid state root // make last block have valid state root
let root = { let root = {
let tx_mut = tx.inner(); let tx_mut = tx.inner_rw();
let root = StateRoot::new(tx_mut.tx_ref()).root().unwrap(); let root = StateRoot::new(tx_mut.tx_ref()).root().unwrap();
tx_mut.commit().unwrap(); tx_mut.commit().unwrap();
root root

View File

@ -6,7 +6,7 @@ use reth_primitives::{
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, listener::EventListeners, stage::StageId, constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, listener::EventListeners, stage::StageId,
BlockNumber, ChainSpec, H256, BlockNumber, ChainSpec, H256,
}; };
use reth_provider::{providers::get_stage_checkpoint, ProviderFactory}; use reth_provider::{ProviderFactory, StageCheckpointProvider};
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use tokio::sync::watch; use tokio::sync::watch;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
@ -137,12 +137,14 @@ where
/// Registers progress metrics for each registered stage /// Registers progress metrics for each registered stage
pub fn register_metrics(&mut self) -> Result<(), PipelineError> { pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
let tx = self.db.tx()?; let factory = ProviderFactory::new(&self.db, self.chain_spec.clone());
let provider = factory.provider()?;
for stage in &self.stages { for stage in &self.stages {
let stage_id = stage.id(); let stage_id = stage.id();
self.metrics.stage_checkpoint( self.metrics.stage_checkpoint(
stage_id, stage_id,
get_stage_checkpoint(&tx, stage_id)?.unwrap_or_default(), provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
None, None,
); );
} }
@ -228,8 +230,14 @@ where
} }
} }
let factory = ProviderFactory::new(&self.db, self.chain_spec.clone());
previous_stage = Some( previous_stage = Some(
get_stage_checkpoint(&self.db.tx()?, stage_id)?.unwrap_or_default().block_number, factory
.provider()?
.get_stage_checkpoint(stage_id)?
.unwrap_or_default()
.block_number,
); );
} }

View File

@ -5,7 +5,7 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
BlockNumber, TxNumber, BlockNumber, TxNumber,
}; };
use reth_provider::{DatabaseProviderRW, ProviderError}; use reth_provider::DatabaseProviderRW;
use std::{ use std::{
cmp::{max, min}, cmp::{max, min},
ops::RangeInclusive, ops::RangeInclusive,
@ -79,10 +79,7 @@ impl ExecInput {
tx_threshold: u64, tx_threshold: u64,
) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> { ) -> Result<(RangeInclusive<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
let start_block = self.next_block(); let start_block = self.next_block();
let start_block_body = provider let start_block_body = provider.block_body_indices(start_block)?;
.tx_ref()
.get::<tables::BlockBodyIndices>(start_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
let target_block = self.target(); let target_block = self.target();

View File

@ -422,7 +422,9 @@ mod tests {
hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode,
ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, MAINNET, U256, ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, MAINNET, U256,
}; };
use reth_provider::{insert_canonical_block, ProviderFactory}; use reth_provider::{
insert_canonical_block, AccountProvider, ProviderFactory, ReceiptProvider,
};
use reth_revm::Factory; use reth_revm::Factory;
use reth_rlp::Decodable; use reth_rlp::Decodable;
use std::sync::Arc; use std::sync::Arc;
@ -624,8 +626,9 @@ mod tests {
}, },
done: true done: true
} if processed == total && total == block.gas_used); } if processed == total && total == block.gas_used);
let mut provider = factory.provider_rw().unwrap();
let tx = provider.tx_mut(); let provider = factory.provider().unwrap();
// check post state // check post state
let account1 = H160(hex!("1000000000000000000000000000000000000000")); let account1 = H160(hex!("1000000000000000000000000000000000000000"));
let account1_info = let account1_info =
@ -645,24 +648,24 @@ mod tests {
// assert accounts // assert accounts
assert_eq!( assert_eq!(
tx.get::<tables::PlainAccountState>(account1), provider.basic_account(account1),
Ok(Some(account1_info)), Ok(Some(account1_info)),
"Post changed of a account" "Post changed of a account"
); );
assert_eq!( assert_eq!(
tx.get::<tables::PlainAccountState>(account2), provider.basic_account(account2),
Ok(Some(account2_info)), Ok(Some(account2_info)),
"Post changed of a account" "Post changed of a account"
); );
assert_eq!( assert_eq!(
tx.get::<tables::PlainAccountState>(account3), provider.basic_account(account3),
Ok(Some(account3_info)), Ok(Some(account3_info)),
"Post changed of a account" "Post changed of a account"
); );
// assert storage // assert storage
// Get on dupsort would return only first value. This is good enough for this test. // Get on dupsort would return only first value. This is good enough for this test.
assert_eq!( assert_eq!(
tx.get::<tables::PlainStorageState>(account1), provider.tx_ref().get::<tables::PlainStorageState>(account1),
Ok(Some(StorageEntry { key: H256::from_low_u64_be(1), value: U256::from(2) })), Ok(Some(StorageEntry { key: H256::from_low_u64_be(1), value: U256::from(2) })),
"Post changed of a account" "Post changed of a account"
); );
@ -739,26 +742,13 @@ mod tests {
} if total == block.gas_used); } if total == block.gas_used);
// assert unwind stage // assert unwind stage
let db_tx = provider.tx_ref(); assert_eq!(provider.basic_account(acc1), Ok(Some(acc1_info)), "Pre changed of a account");
assert_eq!( assert_eq!(provider.basic_account(acc2), Ok(Some(acc2_info)), "Post changed of a account");
db_tx.get::<tables::PlainAccountState>(acc1),
Ok(Some(acc1_info)),
"Pre changed of a account"
);
assert_eq!(
db_tx.get::<tables::PlainAccountState>(acc2),
Ok(Some(acc2_info)),
"Post changed of a account"
);
let miner_acc = H160(hex!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba")); let miner_acc = H160(hex!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba"));
assert_eq!( assert_eq!(provider.basic_account(miner_acc), Ok(None), "Third account should be unwound");
db_tx.get::<tables::PlainAccountState>(miner_acc),
Ok(None),
"Third account should be unwound"
);
assert_eq!(db_tx.get::<tables::Receipts>(0), Ok(None), "First receipt should be unwound"); assert_eq!(provider.receipt(0), Ok(None), "First receipt should be unwound");
} }
#[tokio::test] #[tokio::test]
@ -830,11 +820,7 @@ mod tests {
// assert unwind stage // assert unwind stage
let provider = factory.provider_rw().unwrap(); let provider = factory.provider_rw().unwrap();
assert_eq!( assert_eq!(provider.basic_account(destroyed_address), Ok(None), "Account was destroyed");
provider.tx_ref().get::<tables::PlainAccountState>(destroyed_address),
Ok(None),
"Account was destroyed"
);
assert_eq!( assert_eq!(
provider.tx_ref().get::<tables::PlainStorageState>(destroyed_address), provider.tx_ref().get::<tables::PlainStorageState>(destroyed_address),

View File

@ -532,7 +532,7 @@ mod tests {
type Seed = Vec<(Address, Account)>; type Seed = Vec<(Address, Account)>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> { fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let mut provider = self.tx.inner(); let mut provider = self.tx.inner_rw();
let res = Ok(AccountHashingStage::seed( let res = Ok(AccountHashingStage::seed(
&mut provider, &mut provider,
SeedOpts { blocks: 1..=input.target(), accounts: 0..10, txs: 0..3 }, SeedOpts { blocks: 1..=input.target(), accounts: 0..10, txs: 0..3 },

View File

@ -403,6 +403,7 @@ mod tests {
generators::random_header_range, TestConsensus, TestHeaderDownloader, TestHeadersClient, generators::random_header_range, TestConsensus, TestHeaderDownloader, TestHeadersClient,
}; };
use reth_primitives::U256; use reth_primitives::U256;
use reth_provider::{BlockHashProvider, BlockNumProvider, HeaderProvider};
use std::sync::Arc; use std::sync::Arc;
pub(crate) struct HeadersTestRunner<D: HeaderDownloader> { pub(crate) struct HeadersTestRunner<D: HeaderDownloader> {
@ -478,26 +479,21 @@ mod tests {
let initial_checkpoint = input.checkpoint().block_number; let initial_checkpoint = input.checkpoint().block_number;
match output { match output {
Some(output) if output.checkpoint.block_number > initial_checkpoint => { Some(output) if output.checkpoint.block_number > initial_checkpoint => {
self.tx.query(|tx| { let provider = self.tx.factory.provider()?;
for block_num in for block_num in (initial_checkpoint..output.checkpoint.block_number).rev()
(initial_checkpoint..output.checkpoint.block_number).rev() {
{ // look up the header hash
// look up the header hash let hash = provider.block_hash(block_num)?.expect("no header hash");
let hash = tx
.get::<tables::CanonicalHeaders>(block_num)?
.expect("no header hash");
// validate the header number // validate the header number
assert_eq!(tx.get::<tables::HeaderNumbers>(hash)?, Some(block_num)); assert_eq!(provider.block_number(hash)?, Some(block_num));
// validate the header // validate the header
let header = tx.get::<tables::Headers>(block_num)?; let header = provider.header_by_number(block_num)?;
assert!(header.is_some()); assert!(header.is_some());
let header = header.unwrap().seal_slow(); let header = header.unwrap().seal_slow();
assert_eq!(header.hash(), hash); assert_eq!(header.hash(), hash);
} }
Ok(())
})?;
} }
_ => self.check_no_header_entry_above(initial_checkpoint)?, _ => self.check_no_header_entry_above(initial_checkpoint)?,
}; };

View File

@ -235,6 +235,7 @@ mod tests {
use reth_primitives::{ use reth_primitives::{
stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256, stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256,
}; };
use reth_provider::TransactionsProvider;
use super::*; use super::*;
use crate::test_utils::{ use crate::test_utils::{
@ -373,7 +374,7 @@ mod tests {
/// 2. If the is no requested block entry in the bodies table, /// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxSenders] is not empty. /// but [tables::TxSenders] is not empty.
fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { fn ensure_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().block_body_indices(block); let body_result = self.tx.inner_rw().block_body_indices(block);
match body_result { match body_result {
Ok(body) => self Ok(body) => self
.tx .tx
@ -417,7 +418,8 @@ mod tests {
output: Option<ExecOutput>, output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> { ) -> Result<(), TestRunnerError> {
match output { match output {
Some(output) => self.tx.query(|tx| { Some(output) => {
let provider = self.tx.inner();
let start_block = input.next_block(); let start_block = input.next_block();
let end_block = output.checkpoint.block_number; let end_block = output.checkpoint.block_number;
@ -425,23 +427,20 @@ mod tests {
return Ok(()) return Ok(())
} }
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?; let mut body_cursor =
provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
body_cursor.seek_exact(start_block)?; body_cursor.seek_exact(start_block)?;
while let Some((_, body)) = body_cursor.next()? { while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_num_range() { for tx_id in body.tx_num_range() {
let transaction: TransactionSigned = tx let transaction: TransactionSigned =
.get::<tables::Transactions>(tx_id)? provider.transaction_by_id(tx_id)?.expect("no transaction entry");
.expect("no transaction entry")
.into();
let signer = let signer =
transaction.recover_signer().expect("failed to recover signer"); transaction.recover_signer().expect("failed to recover signer");
assert_eq!(Some(signer), tx.get::<tables::TxSenders>(tx_id)?); assert_eq!(Some(signer), provider.transaction_sender(tx_id)?)
} }
} }
}
Ok(())
})?,
None => self.ensure_no_senders_by_block(input.checkpoint().block_number)?, None => self.ensure_no_senders_by_block(input.checkpoint().block_number)?,
}; };

View File

@ -130,6 +130,7 @@ mod tests {
TestConsensus, TestConsensus,
}; };
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedHeader}; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedHeader};
use reth_provider::HeaderProvider;
use super::*; use super::*;
use crate::test_utils::{ use crate::test_utils::{
@ -262,27 +263,25 @@ mod tests {
let initial_stage_progress = input.checkpoint().block_number; let initial_stage_progress = input.checkpoint().block_number;
match output { match output {
Some(output) if output.checkpoint.block_number > initial_stage_progress => { Some(output) if output.checkpoint.block_number > initial_stage_progress => {
self.tx.query(|tx| { let provider = self.tx.inner();
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
let (_, mut current_header) = header_cursor
.seek_exact(initial_stage_progress)?
.expect("no initial header");
let mut td: U256 = tx
.get::<tables::HeaderTD>(initial_stage_progress)?
.expect("no initial td")
.into();
while let Some((next_key, next_header)) = header_cursor.next()? { let mut header_cursor = provider.tx_ref().cursor_read::<tables::Headers>()?;
assert_eq!(current_header.number + 1, next_header.number); let (_, mut current_header) = header_cursor
td += next_header.difficulty; .seek_exact(initial_stage_progress)?
assert_eq!( .expect("no initial header");
tx.get::<tables::HeaderTD>(next_key)?.map(Into::into), let mut td: U256 = provider
Some(td) .header_td_by_number(initial_stage_progress)?
); .expect("no initial td");
current_header = next_header;
} while let Some((next_key, next_header)) = header_cursor.next()? {
Ok(()) assert_eq!(current_header.number + 1, next_header.number);
})?; td += next_header.difficulty;
assert_eq!(
provider.header_td_by_number(next_key)?.map(Into::into),
Some(td)
);
current_header = next_header;
}
} }
_ => self.check_no_td_above(initial_stage_progress)?, _ => self.check_no_td_above(initial_stage_progress)?,
}; };

View File

@ -199,6 +199,7 @@ mod tests {
use assert_matches::assert_matches; use assert_matches::assert_matches;
use reth_interfaces::test_utils::generators::{random_block, random_block_range}; use reth_interfaces::test_utils::generators::{random_block, random_block_range};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256}; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
use reth_provider::TransactionsProvider;
// Implement stage test suite. // Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup); stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
@ -331,7 +332,7 @@ mod tests {
/// 2. If the is no requested block entry in the bodies table, /// 2. If the is no requested block entry in the bodies table,
/// but [tables::TxHashNumber] is not empty. /// but [tables::TxHashNumber] is not empty.
fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> { fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.tx.inner().block_body_indices(number); let body_result = self.tx.inner_rw().block_body_indices(number);
match body_result { match body_result {
Ok(body) => self.tx.ensure_no_entry_above_by_value::<tables::TxHashNumber, _>( Ok(body) => self.tx.ensure_no_entry_above_by_value::<tables::TxHashNumber, _>(
body.last_tx_num(), body.last_tx_num(),
@ -376,7 +377,9 @@ mod tests {
output: Option<ExecOutput>, output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> { ) -> Result<(), TestRunnerError> {
match output { match output {
Some(output) => self.tx.query(|tx| { Some(output) => {
let provider = self.tx.inner();
let start_block = input.next_block(); let start_block = input.next_block();
let end_block = output.checkpoint.block_number; let end_block = output.checkpoint.block_number;
@ -384,23 +387,18 @@ mod tests {
return Ok(()) return Ok(())
} }
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?; let mut body_cursor =
provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
body_cursor.seek_exact(start_block)?; body_cursor.seek_exact(start_block)?;
while let Some((_, body)) = body_cursor.next()? { while let Some((_, body)) = body_cursor.next()? {
for tx_id in body.tx_num_range() { for tx_id in body.tx_num_range() {
let transaction = tx let transaction =
.get::<tables::Transactions>(tx_id)? provider.transaction_by_id(tx_id)?.expect("no transaction entry");
.expect("no transaction entry"); assert_eq!(Some(tx_id), provider.transaction_id(transaction.hash())?);
assert_eq!(
Some(tx_id),
tx.get::<tables::TxHashNumber>(transaction.hash())?,
);
} }
} }
}
Ok(())
})?,
None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?, None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
}; };
Ok(()) Ok(())

View File

@ -12,6 +12,8 @@ pub(crate) enum TestRunnerError {
Database(#[from] reth_interfaces::db::DatabaseError), Database(#[from] reth_interfaces::db::DatabaseError),
#[error("Internal runner error occurred.")] #[error("Internal runner error occurred.")]
Internal(#[from] Box<dyn std::error::Error>), Internal(#[from] Box<dyn std::error::Error>),
#[error("Internal interface error occurred.")]
Interface(#[from] reth_interfaces::Error),
} }
/// A generic test runner for stages. /// A generic test runner for stages.

View File

@ -4,7 +4,7 @@ use reth_db::{
mdbx::{ mdbx::{
test_utils::{create_test_db, create_test_db_with_path}, test_utils::{create_test_db, create_test_db_with_path},
tx::Tx, tx::Tx,
Env, EnvKind, WriteMap, RW, Env, EnvKind, WriteMap, RO, RW,
}, },
models::{AccountBeforeTx, StoredBlockBodyIndices}, models::{AccountBeforeTx, StoredBlockBodyIndices},
table::Table, table::Table,
@ -16,7 +16,7 @@ use reth_primitives::{
keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256,
MAINNET, U256, MAINNET, U256,
}; };
use reth_provider::{DatabaseProviderRW, ProviderFactory}; use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory};
use std::{ use std::{
borrow::Borrow, borrow::Borrow,
collections::BTreeMap, collections::BTreeMap,
@ -37,7 +37,7 @@ pub struct TestTransaction {
/// WriteMap DB /// WriteMap DB
pub tx: Arc<Env<WriteMap>>, pub tx: Arc<Env<WriteMap>>,
pub path: Option<PathBuf>, pub path: Option<PathBuf>,
factory: ProviderFactory<Arc<Env<WriteMap>>>, pub factory: ProviderFactory<Arc<Env<WriteMap>>>,
} }
impl Default for TestTransaction { impl Default for TestTransaction {
@ -59,10 +59,15 @@ impl TestTransaction {
} }
/// Return a database wrapped in [DatabaseProviderRW]. /// Return a database wrapped in [DatabaseProviderRW].
pub fn inner(&self) -> DatabaseProviderRW<'_, Arc<Env<WriteMap>>> { pub fn inner_rw(&self) -> DatabaseProviderRW<'_, Arc<Env<WriteMap>>> {
self.factory.provider_rw().expect("failed to create db container") self.factory.provider_rw().expect("failed to create db container")
} }
/// Return a database wrapped in [DatabaseProviderRO].
pub fn inner(&self) -> DatabaseProviderRO<'_, Arc<Env<WriteMap>>> {
self.factory.provider().expect("failed to create db container")
}
/// Get a pointer to an internal database. /// Get a pointer to an internal database.
pub fn inner_raw(&self) -> Arc<Env<WriteMap>> { pub fn inner_raw(&self) -> Arc<Env<WriteMap>> {
self.tx.clone() self.tx.clone()
@ -73,7 +78,7 @@ impl TestTransaction {
where where
F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>, F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>,
{ {
let mut tx = self.inner(); let mut tx = self.inner_rw();
f(tx.tx_mut())?; f(tx.tx_mut())?;
tx.commit().expect("failed to commit"); tx.commit().expect("failed to commit");
Ok(()) Ok(())
@ -82,7 +87,7 @@ impl TestTransaction {
/// Invoke a callback with a read transaction /// Invoke a callback with a read transaction
pub fn query<F, R>(&self, f: F) -> Result<R, DbError> pub fn query<F, R>(&self, f: F) -> Result<R, DbError>
where where
F: FnOnce(&Tx<'_, RW, WriteMap>) -> Result<R, DbError>, F: FnOnce(&Tx<'_, RO, WriteMap>) -> Result<R, DbError>,
{ {
f(self.inner().tx_ref()) f(self.inner().tx_ref())
} }

View File

@ -640,12 +640,13 @@ impl PostState {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{AccountProvider, ProviderFactory};
use reth_db::{ use reth_db::{
database::Database, database::Database,
mdbx::{test_utils, Env, EnvKind, WriteMap}, mdbx::{test_utils, Env, EnvKind, WriteMap},
transaction::DbTx, transaction::DbTx,
}; };
use reth_primitives::proofs::EMPTY_ROOT; use reth_primitives::{proofs::EMPTY_ROOT, MAINNET};
use reth_trie::test_utils::state_root; use reth_trie::test_utils::state_root;
use std::sync::Arc; use std::sync::Arc;
@ -1066,7 +1067,8 @@ mod tests {
#[test] #[test]
fn write_to_db_account_info() { fn write_to_db_account_info() {
let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW); let db: Arc<Env<WriteMap>> = test_utils::create_test_db(EnvKind::RW);
let tx = db.tx_mut().expect("Could not get database tx"); let factory = ProviderFactory::new(db, MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let mut post_state = PostState::new(); let mut post_state = PostState::new();
@ -1081,22 +1083,23 @@ mod tests {
post_state.create_account(1, address_a, account_a); post_state.create_account(1, address_a, account_a);
// 0x11.. is changed (balance + 1, nonce + 1) // 0x11.. is changed (balance + 1, nonce + 1)
post_state.change_account(1, address_b, account_b, account_b_changed); post_state.change_account(1, address_b, account_b, account_b_changed);
post_state.write_to_db(&tx).expect("Could not write post state to DB"); post_state.write_to_db(provider.tx_ref()).expect("Could not write post state to DB");
// Check plain state // Check plain state
assert_eq!( assert_eq!(
tx.get::<tables::PlainAccountState>(address_a).expect("Could not read account state"), provider.basic_account(address_a).expect("Could not read account state"),
Some(account_a), Some(account_a),
"Account A state is wrong" "Account A state is wrong"
); );
assert_eq!( assert_eq!(
tx.get::<tables::PlainAccountState>(address_b).expect("Could not read account state"), provider.basic_account(address_b).expect("Could not read account state"),
Some(account_b_changed), Some(account_b_changed),
"Account B state is wrong" "Account B state is wrong"
); );
// Check change set // Check change set
let mut changeset_cursor = tx let mut changeset_cursor = provider
.tx_ref()
.cursor_dup_read::<tables::AccountChangeSet>() .cursor_dup_read::<tables::AccountChangeSet>()
.expect("Could not open changeset cursor"); .expect("Could not open changeset cursor");
assert_eq!( assert_eq!(
@ -1113,11 +1116,11 @@ mod tests {
let mut post_state = PostState::new(); let mut post_state = PostState::new();
// 0x11.. is destroyed // 0x11.. is destroyed
post_state.destroy_account(2, address_b, account_b_changed); post_state.destroy_account(2, address_b, account_b_changed);
post_state.write_to_db(&tx).expect("Could not write second post state to DB"); post_state.write_to_db(provider.tx_ref()).expect("Could not write second post state to DB");
// Check new plain state for account B // Check new plain state for account B
assert_eq!( assert_eq!(
tx.get::<tables::PlainAccountState>(address_b).expect("Could not read account state"), provider.basic_account(address_b).expect("Could not read account state"),
None, None,
"Account B should be deleted" "Account B should be deleted"
); );

View File

@ -5,7 +5,7 @@ use crate::{
ProviderError, StageCheckpointProvider, StateProviderBox, TransactionsProvider, ProviderError, StageCheckpointProvider, StateProviderBox, TransactionsProvider,
WithdrawalsProvider, WithdrawalsProvider,
}; };
use reth_db::{database::Database, models::StoredBlockBodyIndices, tables, transaction::DbTx}; use reth_db::{database::Database, models::StoredBlockBodyIndices};
use reth_interfaces::Result; use reth_interfaces::Result;
use reth_primitives::{ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
@ -246,6 +246,10 @@ impl<DB: Database> TransactionsProvider for ProviderFactory<DB> {
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> Result<Vec<Address>> { fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> Result<Vec<Address>> {
self.provider()?.senders_by_tx_range(range) self.provider()?.senders_by_tx_range(range)
} }
fn transaction_sender(&self, id: TxNumber) -> Result<Option<Address>> {
self.provider()?.transaction_sender(id)
}
} }
impl<DB: Database> ReceiptProvider for ProviderFactory<DB> { impl<DB: Database> ReceiptProvider for ProviderFactory<DB> {
@ -318,18 +322,6 @@ impl<DB: Database> EvmEnvProvider for ProviderFactory<DB> {
} }
} }
/// Get checkpoint for the given stage.
#[inline]
pub fn get_stage_checkpoint<'a, TX>(
tx: &TX,
id: StageId,
) -> std::result::Result<Option<StageCheckpoint>, reth_interfaces::db::DatabaseError>
where
TX: DbTx<'a> + Send + Sync,
{
tx.get::<tables::SyncStage>(id.to_string())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::ProviderFactory; use super::ProviderFactory;

View File

@ -43,8 +43,6 @@ use std::{
sync::Arc, sync::Arc,
}; };
use super::get_stage_checkpoint;
/// A [`DatabaseProvider`] that holds a read-only database transaction. /// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<'this, DB> = DatabaseProvider<'this, <DB as DatabaseGAT<'this>>::TX>; pub type DatabaseProviderRO<'this, DB> = DatabaseProvider<'this, <DB as DatabaseGAT<'this>>::TX>;
@ -740,8 +738,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
let parent_number = range.start().saturating_sub(1); let parent_number = range.start().saturating_sub(1);
let parent_state_root = self let parent_state_root = self
.tx .header_by_number(parent_number)?
.get::<tables::Headers>(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))? .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root; .state_root;
@ -749,8 +746,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
// but for sake of double verification we will check it again. // but for sake of double verification we will check it again.
if new_state_root != parent_state_root { if new_state_root != parent_state_root {
let parent_hash = self let parent_hash = self
.tx .block_hash(parent_number)?
.get::<tables::CanonicalHeaders>(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?; .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(TransactionError::UnwindStateRootMismatch { return Err(TransactionError::UnwindStateRootMismatch {
got: new_state_root, got: new_state_root,
@ -1076,14 +1072,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(()) Ok(())
} }
/// Get the stage checkpoint.
pub fn get_stage_checkpoint(
&self,
id: StageId,
) -> std::result::Result<Option<StageCheckpoint>, DatabaseError> {
get_stage_checkpoint(&self.tx, id)
}
/// Save stage checkpoint. /// Save stage checkpoint.
pub fn save_stage_checkpoint( pub fn save_stage_checkpoint(
&self, &self,
@ -1743,6 +1731,10 @@ impl<'this, TX: DbTx<'this>> TransactionsProvider for DatabaseProvider<'this, TX
.map(|entry| entry.map(|sender| sender.1)) .map(|entry| entry.map(|sender| sender.1))
.collect::<std::result::Result<Vec<_>, _>>()?) .collect::<std::result::Result<Vec<_>, _>>()?)
} }
fn transaction_sender(&self, id: TxNumber) -> Result<Option<Address>> {
Ok(self.tx.get::<tables::TxSenders>(id)?)
}
} }
impl<'this, TX: DbTx<'this>> ReceiptProvider for DatabaseProvider<'this, TX> { impl<'this, TX: DbTx<'this>> ReceiptProvider for DatabaseProvider<'this, TX> {

View File

@ -302,6 +302,10 @@ where
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> Result<Vec<Address>> { fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> Result<Vec<Address>> {
self.database.provider()?.senders_by_tx_range(range) self.database.provider()?.senders_by_tx_range(range)
} }
fn transaction_sender(&self, id: TxNumber) -> Result<Option<Address>> {
self.database.provider()?.transaction_sender(id)
}
} }
impl<DB, Tree> ReceiptProvider for BlockchainProvider<DB, Tree> impl<DB, Tree> ReceiptProvider for BlockchainProvider<DB, Tree>

View File

@ -219,6 +219,10 @@ impl TransactionsProvider for MockEthProvider {
) -> Result<Vec<reth_primitives::TransactionSignedNoHash>> { ) -> Result<Vec<reth_primitives::TransactionSignedNoHash>> {
unimplemented!() unimplemented!()
} }
fn transaction_sender(&self, _id: TxNumber) -> Result<Option<Address>> {
unimplemented!()
}
} }
impl ReceiptProvider for MockEthProvider { impl ReceiptProvider for MockEthProvider {

View File

@ -159,6 +159,10 @@ impl TransactionsProvider for NoopProvider {
) -> Result<Vec<reth_primitives::TransactionSignedNoHash>> { ) -> Result<Vec<reth_primitives::TransactionSignedNoHash>> {
Ok(Vec::default()) Ok(Vec::default())
} }
fn transaction_sender(&self, _id: TxNumber) -> Result<Option<Address>> {
Ok(None)
}
} }
impl ReceiptProvider for NoopProvider { impl ReceiptProvider for NoopProvider {

View File

@ -51,4 +51,9 @@ pub trait TransactionsProvider: BlockNumProvider + Send + Sync {
/// Get Senders from a tx range. /// Get Senders from a tx range.
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> Result<Vec<Address>>; fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> Result<Vec<Address>>;
/// Get transaction sender.
///
/// Returns None if the transaction is not found.
fn transaction_sender(&self, id: TxNumber) -> Result<Option<Address>>;
} }

View File

@ -39,6 +39,9 @@ pub enum TransactionError {
/// Block hash /// Block hash
block_hash: BlockHash, block_hash: BlockHash,
}, },
/// Internal interfaces error
#[error("Internal error")]
InternalError(#[from] reth_interfaces::Error),
} }
#[cfg(test)] #[cfg(test)]