refactor: add BlockWriter and BlockExecutionWriter (#3384)

This commit is contained in:
joshieDo
2023-06-27 10:02:17 +01:00
committed by GitHub
parent d3d44fd46f
commit 085a703d7c
16 changed files with 369 additions and 372 deletions

View File

@ -26,7 +26,7 @@ use reth_interfaces::{
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256};
use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader};
use reth_staged_sync::utils::init::{init_db, init_genesis};
use reth_stages::{
sets::DefaultStages,

View File

@ -13,7 +13,7 @@ use reth_db::{
transaction::DbTx,
};
use reth_primitives::{BlockHashOrNumber, ChainSpec};
use reth_provider::ProviderFactory;
use reth_provider::{BlockExecutionWriter, ProviderFactory};
use std::{ops::RangeInclusive, sync::Arc};
/// `reth stage unwind` command

View File

@ -22,8 +22,9 @@ use reth_primitives::{
use reth_provider::{
chain::{ChainSplit, SplitAt},
post_state::PostState,
BlockNumReader, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications,
Chain, DatabaseProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider,
BlockExecutionWriter, BlockNumReader, BlockWriter, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, Chain, DatabaseProvider,
DisplayBlocksChain, ExecutorFactory, HeaderProvider,
};
use std::{
collections::{BTreeMap, HashMap},
@ -1007,8 +1008,8 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
/// Canonicalize the given chain and commit it to the database.
fn commit_canonical(&mut self, chain: Chain) -> Result<(), Error> {
let mut provider = DatabaseProvider::new_rw(
fn commit_canonical(&self, chain: Chain) -> Result<(), Error> {
let provider = DatabaseProvider::new_rw(
self.externals.db.tx_mut()?,
self.externals.chain_spec.clone(),
);
@ -1093,9 +1094,9 @@ mod tests {
proofs::EMPTY_ROOT, stage::StageCheckpoint, ChainSpecBuilder, H256, MAINNET,
};
use reth_provider::{
insert_block,
post_state::PostState,
test_utils::{blocks::BlockChainTestData, TestExecutorFactory},
BlockWriter, ProviderFactory,
};
use std::{collections::HashSet, sync::Arc};
@ -1122,16 +1123,23 @@ mod tests {
genesis.header.header.number = 10;
genesis.header.header.state_root = EMPTY_ROOT;
let tx_mut = db.tx_mut().unwrap();
let factory = ProviderFactory::new(&db, MAINNET.clone());
let provider = factory.provider_rw().unwrap();
insert_block(&tx_mut, genesis, None).unwrap();
provider.insert_block(genesis, None).unwrap();
// insert first 10 blocks
for i in 0..10 {
tx_mut.put::<tables::CanonicalHeaders>(i, H256([100 + i as u8; 32])).unwrap();
provider
.tx_ref()
.put::<tables::CanonicalHeaders>(i, H256([100 + i as u8; 32]))
.unwrap();
}
tx_mut.put::<tables::SyncStage>("Finish".to_string(), StageCheckpoint::new(10)).unwrap();
tx_mut.commit().unwrap();
provider
.tx_ref()
.put::<tables::SyncStage>("Finish".to_string(), StageCheckpoint::new(10))
.unwrap();
provider.commit().unwrap();
}
/// Test data structure that will check tree internals

View File

@ -1368,7 +1368,8 @@ mod tests {
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{stage::StageCheckpoint, ChainSpec, ChainSpecBuilder, H256, MAINNET};
use reth_provider::{
providers::BlockchainProvider, test_utils::TestExecutorFactory, ProviderFactory,
providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockWriter,
ProviderFactory,
};
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
use reth_tasks::TokioTaskExecutor;
@ -1711,8 +1712,10 @@ mod tests {
mut blocks: impl Iterator<Item = &'a SealedBlock>,
) {
let factory = ProviderFactory::new(db, chain);
let mut provider = factory.provider_rw().unwrap();
blocks.try_for_each(|b| provider.insert_block(b.clone(), None)).expect("failed to insert");
let provider = factory.provider_rw().unwrap();
blocks
.try_for_each(|b| provider.insert_block(b.clone(), None).map(|_| ()))
.expect("failed to insert");
provider.commit().unwrap();
}

View File

@ -422,7 +422,7 @@ mod tests {
hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode,
ChainSpecBuilder, SealedBlock, StorageEntry, H160, H256, MAINNET, U256,
};
use reth_provider::{insert_canonical_block, AccountReader, ProviderFactory, ReceiptProvider};
use reth_provider::{AccountReader, BlockWriter, ProviderFactory, ReceiptProvider};
use reth_revm::Factory;
use reth_rlp::Decodable;
use std::sync::Arc;
@ -465,14 +465,14 @@ mod tests {
fn execution_checkpoint_precedes() {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421bbe400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabbbe40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(provider.tx_mut(), genesis, None).unwrap();
insert_canonical_block(provider.tx_mut(), block.clone(), None).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();
let previous_stage_checkpoint = ExecutionCheckpoint {
@ -501,14 +501,14 @@ mod tests {
fn execution_checkpoint_recalculate_full_previous_some() {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421bbe400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(provider.tx_mut(), genesis, None).unwrap();
insert_canonical_block(provider.tx_mut(), block.clone(), None).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();
let previous_stage_checkpoint = ExecutionCheckpoint {
@ -537,14 +537,14 @@ mod tests {
fn execution_checkpoint_recalculate_full_previous_none() {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421bbe400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabbbe40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(provider.tx_mut(), genesis, None).unwrap();
insert_canonical_block(provider.tx_mut(), block.clone(), None).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();
let previous_checkpoint = StageCheckpoint { block_number: 1, stage_checkpoint: None };
@ -567,7 +567,7 @@ mod tests {
// is merged as it has similar framework
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();
let input = ExecInput {
target: Some(1),
/// The progress of this stage the last time it was executed.
@ -577,13 +577,13 @@ mod tests {
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabbbe40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(provider.tx_mut(), genesis, None).unwrap();
insert_canonical_block(provider.tx_mut(), block.clone(), None).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();
// insert pre state
let mut provider = factory.provider_rw().unwrap();
let db_tx = provider.tx_mut();
let provider = factory.provider_rw().unwrap();
let db_tx = provider.tx_ref();
let acc1 = H160(hex!("1000000000000000000000000000000000000000"));
let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"));
let code = hex!("5a465a905090036002900360015500");
@ -676,7 +676,7 @@ mod tests {
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();
let input = ExecInput {
target: Some(1),
/// The progress of this stage the last time it was executed.
@ -686,8 +686,8 @@ mod tests {
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabbbe40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(provider.tx_mut(), genesis, None).unwrap();
insert_canonical_block(provider.tx_mut(), block.clone(), None).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();
// variables
@ -695,8 +695,8 @@ mod tests {
let balance = U256::from(0x3635c9adc5dea00000u128);
let code_hash = keccak256(code);
// pre state
let mut provider = factory.provider_rw().unwrap();
let db_tx = provider.tx_mut();
let provider = factory.provider_rw().unwrap();
let db_tx = provider.tx_ref();
let acc1 = H160(hex!("1000000000000000000000000000000000000000"));
let acc1_info = Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) };
let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"));
@ -753,7 +753,7 @@ mod tests {
async fn test_selfdestruct() {
let test_tx = TestTransaction::default();
let factory = ProviderFactory::new(test_tx.tx.as_ref(), MAINNET.clone());
let mut provider = factory.provider_rw().unwrap();
let provider = factory.provider_rw().unwrap();
let input = ExecInput {
target: Some(1),
/// The progress of this stage the last time it was executed.
@ -763,8 +763,8 @@ mod tests {
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f9025ff901f7a0c86e8cc0310ae7c531c758678ddbfd16fc51c8cef8cec650b032de9869e8b94fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa050554882fbbda2c2fd93fdc466db9946ea262a67f7a76cc169e714f105ab583da00967f09ef1dfed20c0eacfaa94d5cd4002eda3242ac47eae68972d07b106d192a0e3c8b47fbfc94667ef4cceb17e5cc21e3b1eebd442cebb27f07562b33836290dbf42408238108203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f862f860800a83061a8094095e7baea6a6c7c4c2dfeb977efac326af552d8780801ba072ed817487b84ba367d15d2f039b5fc5f087d0a8882fbdf73e8cb49357e1ce30a0403d800545b8fc544f92ce8124e2255f8c3c6af93f28243a120585d4c4c6a2a3c0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
insert_canonical_block(provider.tx_mut(), genesis, None).unwrap();
insert_canonical_block(provider.tx_mut(), block.clone(), None).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();
// variables

View File

@ -88,14 +88,14 @@ impl AccountHashingStage {
generators::{random_block_range, random_eoa_account_range},
};
use reth_primitives::{Account, H256, U256};
use reth_provider::insert_canonical_block;
use reth_provider::BlockWriter;
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, opts.blocks.clone(), H256::zero(), opts.txs);
for block in blocks {
insert_canonical_block(provider.tx_ref(), block, None).unwrap();
provider.insert_block(block, None).unwrap();
}
let mut accounts = random_eoa_account_range(&mut rng, opts.accounts);
{

View File

@ -76,10 +76,10 @@ impl TestTransaction {
/// Invoke a callback with transaction committing it afterwards
pub fn commit<F>(&self, f: F) -> Result<(), DbError>
where
F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>,
F: FnOnce(&Tx<'_, RW, WriteMap>) -> Result<(), DbError>,
{
let mut tx = self.inner_rw();
f(tx.tx_mut())?;
f(tx.tx_ref())?;
tx.commit().expect("failed to commit");
Ok(())
}
@ -200,7 +200,7 @@ impl TestTransaction {
}
/// Inserts a single [SealedHeader] into the corresponding tables of the headers stage.
fn insert_header(tx: &mut Tx<'_, RW, WriteMap>, header: &SealedHeader) -> Result<(), DbError> {
fn insert_header(tx: &Tx<'_, RW, WriteMap>, header: &SealedHeader) -> Result<(), DbError> {
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::HeaderNumbers>(header.hash(), header.number)?;
tx.put::<tables::Headers>(header.number, header.clone().unseal())

View File

@ -21,14 +21,14 @@
/// Various provider traits.
mod traits;
pub use traits::{
AccountExtReader, AccountReader, BlockExecutor, BlockHashReader, BlockIdReader, BlockNumReader,
BlockReader, BlockReaderIdExt, BlockSource, BlockchainTreePendingStateProvider,
CanonChainTracker, CanonStateNotification, CanonStateNotificationSender,
CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider, ExecutorFactory,
HashingWriter, HeaderProvider, HistoryWriter, PostStateDataProvider, ReceiptProvider,
ReceiptProviderIdExt, StageCheckpointReader, StageCheckpointWriter, StateProvider,
StateProviderBox, StateProviderFactory, StateRootProvider, StorageReader, TransactionsProvider,
WithdrawalsProvider,
AccountExtReader, AccountReader, BlockExecutionWriter, BlockExecutor, BlockHashReader,
BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource, BlockWriter,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, EvmEnvProvider,
ExecutorFactory, HashingWriter, HeaderProvider, HistoryWriter, PostStateDataProvider,
ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StageCheckpointWriter,
StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, StorageReader,
TransactionsProvider, WithdrawalsProvider,
};
/// Provider trait implementations.
@ -42,10 +42,6 @@ pub use providers::{
pub mod post_state;
pub use post_state::PostState;
/// Common database utilities.
mod utils;
pub use utils::{insert_block, insert_canonical_block};
#[cfg(any(test, feature = "test-utils"))]
/// Common test helpers for mocking the Provider.
pub mod test_utils;

View File

@ -1,10 +1,9 @@
use crate::{
insert_canonical_block,
post_state::StorageChangeset,
traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter},
AccountReader, BlockHashReader, BlockNumReader, BlockReader, EvmEnvProvider, HashingWriter,
HeaderProvider, HistoryWriter, PostState, ProviderError, StageCheckpointReader, StorageReader,
TransactionsProvider, WithdrawalsProvider,
AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter,
EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError,
StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
use reth_db::{
@ -13,7 +12,7 @@ use reth_db::{
database::{Database, DatabaseGAT},
models::{
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
ShardedKey, StoredBlockBodyIndices,
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
},
table::Table,
tables,
@ -190,24 +189,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
// TODO(joshie) TEMPORARY should be moved to trait providers
/// Get range of blocks and its execution result
pub fn get_block_and_execution_range(
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
self.get_take_block_and_execution_range::<false>(chain_spec, range)
}
/// Take range of blocks and its execution result
pub fn take_block_and_execution_range(
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
self.get_take_block_and_execution_range::<true>(chain_spec, range)
}
/// Traverse over changesets and plain state and recreate the [`PostState`]s for the given range
/// of blocks.
///
@ -389,72 +370,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(block_states.into_values().collect())
}
/// Return range of blocks and its execution result
pub fn get_take_block_and_execution_range<const TAKE: bool>(
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
if TAKE {
let storage_range = BlockNumberAddress::range(range.clone());
self.unwind_account_hashing(range.clone())?;
self.unwind_account_history_indices(range.clone())?;
self.unwind_storage_hashing(storage_range.clone())?;
self.unwind_storage_history_indices(storage_range)?;
// merkle tree
let (new_state_root, trie_updates) =
StateRoot::incremental_root_with_updates(&self.tx, range.clone())
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch {
got: new_state_root,
expected: parent_state_root,
block_number: parent_number,
block_hash: parent_hash,
}
.into())
}
trie_updates.flush(&self.tx)?;
}
// get blocks
let blocks = self.get_take_block_range::<TAKE>(chain_spec, range.clone())?;
let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1));
// get execution res
let execution_res = self.get_take_block_execution_result_range::<TAKE>(range.clone())?;
// combine them
let blocks_with_exec_result: Vec<_> =
blocks.into_iter().zip(execution_res.into_iter()).collect();
// remove block bodies it is needed for both get block range and get block execution results
// that is why it is deleted afterwards.
if TAKE {
// rm block bodies
self.get_or_take::<tables::BlockBodyIndices, TAKE>(range)?;
// Update pipeline progress
if let Some(fork_number) = unwind_to {
self.update_pipeline_stages(fork_number, true)?;
}
}
// return them
Ok(blocks_with_exec_result)
}
/// Return list of entries from table
///
/// If TAKE is true, opened cursor would be write and it would delete all values from db.
@ -770,56 +685,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
}
Ok(())
}
/// Append blocks and insert its post state.
/// This will insert block data to all related tables and will update pipeline progress.
pub fn append_blocks_with_post_state(
&mut self,
blocks: Vec<SealedBlockWithSenders>,
state: PostState,
) -> Result<()> {
if blocks.is_empty() {
return Ok(())
}
let new_tip = blocks.last().unwrap();
let new_tip_number = new_tip.number;
let first_number = blocks.first().unwrap().number;
let last = blocks.last().unwrap();
let last_block_number = last.number;
let last_block_hash = last.hash();
let expected_state_root = last.state_root;
// Insert the blocks
for block in blocks {
let (block, senders) = block.into_components();
insert_canonical_block(self.tx_mut(), block, Some(senders))?;
}
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
state.write_to_db(self.tx_mut())?;
self.insert_hashes(first_number..=last_block_number, last_block_hash, expected_state_root)?;
self.calculate_history_indices(first_number..=last_block_number)?;
// Update pipeline progress
self.update_pipeline_stages(new_tip_number, false)?;
Ok(())
}
/// Insert full block and make it canonical.
pub fn insert_block(
&mut self,
block: SealedBlock,
senders: Option<Vec<Address>>,
) -> Result<()> {
insert_canonical_block(self.tx_mut(), block, senders)?;
Ok(())
}
}
impl<'this, TX: DbTx<'this>> AccountReader for DatabaseProvider<'this, TX> {
@ -1745,3 +1610,192 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
Ok(changesets)
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> BlockExecutionWriter for DatabaseProvider<'this, TX> {
fn get_or_take_block_and_execution_range<const TAKE: bool>(
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
if TAKE {
let storage_range = BlockNumberAddress::range(range.clone());
self.unwind_account_hashing(range.clone())?;
self.unwind_account_history_indices(range.clone())?;
self.unwind_storage_hashing(storage_range.clone())?;
self.unwind_storage_history_indices(storage_range)?;
// merkle tree
let (new_state_root, trie_updates) =
StateRoot::incremental_root_with_updates(&self.tx, range.clone())
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch {
got: new_state_root,
expected: parent_state_root,
block_number: parent_number,
block_hash: parent_hash,
}
.into())
}
trie_updates.flush(&self.tx)?;
}
// get blocks
let blocks = self.get_take_block_range::<TAKE>(chain_spec, range.clone())?;
let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1));
// get execution res
let execution_res = self.get_take_block_execution_result_range::<TAKE>(range.clone())?;
// combine them
let blocks_with_exec_result: Vec<_> =
blocks.into_iter().zip(execution_res.into_iter()).collect();
// remove block bodies it is needed for both get block range and get block execution results
// that is why it is deleted afterwards.
if TAKE {
// rm block bodies
self.get_or_take::<tables::BlockBodyIndices, TAKE>(range)?;
// Update pipeline progress
if let Some(fork_number) = unwind_to {
self.update_pipeline_stages(fork_number, true)?;
}
}
// return them
Ok(blocks_with_exec_result)
}
}
impl<'this, TX: DbTxMut<'this> + DbTx<'this>> BlockWriter for DatabaseProvider<'this, TX> {
fn insert_block(
&self,
block: SealedBlock,
senders: Option<Vec<Address>>,
) -> Result<StoredBlockBodyIndices> {
let block_number = block.number;
self.tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
// Put header with canonical hashes.
self.tx.put::<tables::Headers>(block.number, block.header.as_ref().clone())?;
self.tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
// total difficulty
let ttd = if block.number == 0 {
block.difficulty
} else {
let parent_block_number = block.number - 1;
let parent_ttd =
self.tx.get::<tables::HeaderTD>(parent_block_number)?.unwrap_or_default();
parent_ttd.0 + block.difficulty
};
self.tx.put::<tables::HeaderTD>(block.number, ttd.into())?;
// insert body ommers data
if !block.ommers.is_empty() {
self.tx.put::<tables::BlockOmmers>(
block.number,
StoredBlockOmmers { ommers: block.ommers },
)?;
}
let mut next_tx_num = self
.tx
.cursor_read::<tables::Transactions>()?
.last()?
.map(|(n, _)| n + 1)
.unwrap_or_default();
let first_tx_num = next_tx_num;
let tx_count = block.body.len() as u64;
let senders_len = senders.as_ref().map(|s| s.len());
let tx_iter = if Some(block.body.len()) == senders_len {
block.body.into_iter().zip(senders.unwrap().into_iter()).collect::<Vec<(_, _)>>()
} else {
block
.body
.into_iter()
.map(|tx| {
let signer = tx.recover_signer();
(tx, signer.unwrap_or_default())
})
.collect::<Vec<(_, _)>>()
};
for (transaction, sender) in tx_iter {
let hash = transaction.hash();
self.tx.put::<tables::TxSenders>(next_tx_num, sender)?;
self.tx.put::<tables::Transactions>(next_tx_num, transaction.into())?;
self.tx.put::<tables::TxHashNumber>(hash, next_tx_num)?;
next_tx_num += 1;
}
if let Some(withdrawals) = block.withdrawals {
if !withdrawals.is_empty() {
self.tx.put::<tables::BlockWithdrawals>(
block_number,
StoredBlockWithdrawals { withdrawals },
)?;
}
}
let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count };
self.tx.put::<tables::BlockBodyIndices>(block_number, block_indices.clone())?;
if !block_indices.is_empty() {
self.tx.put::<tables::TransactionBlock>(block_indices.last_tx_num(), block_number)?;
}
Ok(block_indices)
}
fn append_blocks_with_post_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
state: PostState,
) -> Result<()> {
if blocks.is_empty() {
return Ok(())
}
let new_tip = blocks.last().unwrap();
let new_tip_number = new_tip.number;
let first_number = blocks.first().unwrap().number;
let last = blocks.last().unwrap();
let last_block_number = last.number;
let last_block_hash = last.hash();
let expected_state_root = last.state_root;
// Insert the blocks
for block in blocks {
let (block, senders) = block.into_components();
self.insert_block(block, Some(senders))?;
}
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
state.write_to_db(self.tx_ref())?;
self.insert_hashes(first_number..=last_block_number, last_block_hash, expected_state_root)?;
self.calculate_history_indices(first_number..=last_block_number)?;
// Update pipeline progress
self.update_pipeline_stages(new_tip_number, false)?;
Ok(())
}
}

View File

@ -1,13 +1,15 @@
use crate::{
BlockIdReader, BlockNumReader, HeaderProvider, ReceiptProvider, ReceiptProviderIdExt,
TransactionsProvider, WithdrawalsProvider,
BlockIdReader, BlockNumReader, HeaderProvider, PostState, ReceiptProvider,
ReceiptProviderIdExt, TransactionsProvider, WithdrawalsProvider,
};
use auto_impl::auto_impl;
use reth_db::models::StoredBlockBodyIndices;
use reth_interfaces::Result;
use reth_primitives::{
Block, BlockHashOrNumber, BlockId, BlockNumber, BlockNumberOrTag, BlockWithSenders, Header,
Receipt, SealedBlock, SealedHeader, H256,
Address, Block, BlockHashOrNumber, BlockId, BlockNumber, BlockNumberOrTag, BlockWithSenders,
ChainSpec, Header, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, H256,
};
use std::ops::RangeInclusive;
/// A helper enum that represents the origin of the requested block.
///
@ -198,3 +200,55 @@ pub trait BlockReaderIdExt: BlockReader + BlockIdReader + ReceiptProviderIdExt {
/// Returns `None` if block is not found.
fn ommers_by_id(&self, id: BlockId) -> Result<Option<Vec<Header>>>;
}
/// BlockExecution Writer
#[auto_impl(&, Arc, Box)]
pub trait BlockExecutionWriter: BlockWriter + BlockReader + Send + Sync {
/// Get range of blocks and its execution result
fn get_block_and_execution_range(
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
self.get_or_take_block_and_execution_range::<false>(chain_spec, range)
}
/// Take range of blocks and its execution result
fn take_block_and_execution_range(
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> Result<Vec<(SealedBlockWithSenders, PostState)>> {
self.get_or_take_block_and_execution_range::<true>(chain_spec, range)
}
/// Return range of blocks and its execution result
fn get_or_take_block_and_execution_range<const TAKE: bool>(
&self,
chain_spec: &ChainSpec,
range: RangeInclusive<BlockNumber>,
) -> Result<Vec<(SealedBlockWithSenders, PostState)>>;
}
/// Block Writer
#[auto_impl(&, Arc, Box)]
pub trait BlockWriter: Send + Sync {
/// Insert full block and make it canonical. Parent tx num and transition id is taken from
/// parent block in database.
///
/// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and
/// transition in the block.
fn insert_block(
&self,
block: SealedBlock,
senders: Option<Vec<Address>>,
) -> Result<StoredBlockBodyIndices>;
/// Append blocks and insert its post state.
/// This will insert block data to all related tables and will update pipeline progress.
fn append_blocks_with_post_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
state: PostState,
) -> Result<()>;
}

View File

@ -7,7 +7,7 @@ mod storage;
pub use storage::StorageReader;
mod block;
pub use block::{BlockReader, BlockReaderIdExt, BlockSource};
pub use block::{BlockExecutionWriter, BlockReader, BlockReaderIdExt, BlockSource, BlockWriter};
mod block_hash;
pub use block_hash::BlockHashReader;

View File

@ -5,9 +5,7 @@ use std::fmt::Debug;
#[cfg(test)]
mod test {
use crate::{
insert_canonical_block, test_utils::blocks::*, ProviderFactory, TransactionsProvider,
};
use crate::{test_utils::blocks::*, ProviderFactory, TransactionsProvider};
use reth_db::{
mdbx::test_utils::create_test_rw_db,
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
@ -40,7 +38,7 @@ mod test {
let storage1_shard_key =
StorageShardedKey::new(H160([0x60; 20]), U256::from(5).into(), u64::MAX);
insert_canonical_block(provider.tx_ref(), data.genesis.clone(), None).unwrap();
provider.insert_block(data.genesis.clone(), None).unwrap();
assert_genesis_block(&provider, data.genesis);
@ -153,7 +151,7 @@ mod test {
let (block1, exec_res1) = data.blocks[0].clone();
let (block2, exec_res2) = data.blocks[1].clone();
insert_canonical_block(provider.tx_mut(), data.genesis.clone(), None).unwrap();
provider.insert_block(data.genesis.clone(), None).unwrap();
assert_genesis_block(&provider, data.genesis);

View File

@ -1,103 +0,0 @@
use reth_db::{
cursor::DbCursorRO,
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_primitives::{Address, SealedBlock};
/// Insert block data into corresponding tables. Used mainly for testing & internal tooling.
///
///
/// Check parent dependency in [tables::HeaderNumbers] and in [tables::BlockBodyIndices] tables.
/// Inserts header data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers].
/// and transactions data to [tables::TxSenders], [tables::Transactions], [tables::TxHashNumber].
/// and transition/transaction meta data to [tables::BlockBodyIndices]
/// and block data to [tables::BlockOmmers] and [tables::BlockWithdrawals].
///
/// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and
/// transition in the block.
pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
tx: &TX,
block: SealedBlock,
senders: Option<Vec<Address>>,
) -> Result<StoredBlockBodyIndices, DatabaseError> {
let block_number = block.number;
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
// Put header with canonical hashes.
tx.put::<tables::Headers>(block.number, block.header.as_ref().clone())?;
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
// total difficulty
let ttd = if block.number == 0 {
block.difficulty
} else {
let parent_block_number = block.number - 1;
let parent_ttd = tx.get::<tables::HeaderTD>(parent_block_number)?.unwrap_or_default();
parent_ttd.0 + block.difficulty
};
tx.put::<tables::HeaderTD>(block.number, ttd.into())?;
// insert body ommers data
if !block.ommers.is_empty() {
tx.put::<tables::BlockOmmers>(block.number, StoredBlockOmmers { ommers: block.ommers })?;
}
let mut next_tx_num =
tx.cursor_read::<tables::Transactions>()?.last()?.map(|(n, _)| n + 1).unwrap_or_default();
let first_tx_num = next_tx_num;
let tx_count = block.body.len() as u64;
let senders_len = senders.as_ref().map(|s| s.len());
let tx_iter = if Some(block.body.len()) == senders_len {
block.body.into_iter().zip(senders.unwrap().into_iter()).collect::<Vec<(_, _)>>()
} else {
block
.body
.into_iter()
.map(|tx| {
let signer = tx.recover_signer();
(tx, signer.unwrap_or_default())
})
.collect::<Vec<(_, _)>>()
};
for (transaction, sender) in tx_iter {
let hash = transaction.hash();
tx.put::<tables::TxSenders>(next_tx_num, sender)?;
tx.put::<tables::Transactions>(next_tx_num, transaction.into())?;
tx.put::<tables::TxHashNumber>(hash, next_tx_num)?;
next_tx_num += 1;
}
if let Some(withdrawals) = block.withdrawals {
if !withdrawals.is_empty() {
tx.put::<tables::BlockWithdrawals>(
block_number,
StoredBlockWithdrawals { withdrawals },
)?;
}
}
let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count };
tx.put::<tables::BlockBodyIndices>(block_number, block_indices.clone())?;
if !block_indices.is_empty() {
tx.put::<tables::TransactionBlock>(block_indices.last_tx_num(), block_number)?;
}
Ok(block_indices)
}
/// Inserts canonical block in blockchain. Parent tx num and transition id is taken from
/// parent block in database.
pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
tx: &TX,
block: SealedBlock,
senders: Option<Vec<Address>>,
) -> Result<StoredBlockBodyIndices, DatabaseError> {
insert_block(tx, block, senders)
}

View File

@ -529,7 +529,7 @@ mod tests {
use std::{collections::BTreeMap, ops::Mul, str::FromStr};
fn insert_account<'a, TX: DbTxMut<'a>>(
tx: &mut TX,
tx: &TX,
address: Address,
account: Account,
storage: &BTreeMap<H256, U256>,
@ -540,7 +540,7 @@ mod tests {
}
fn insert_storage<'a, TX: DbTxMut<'a>>(
tx: &mut TX,
tx: &TX,
hashed_address: H256,
storage: &BTreeMap<H256, U256>,
) {
@ -556,7 +556,7 @@ mod tests {
fn incremental_vs_full_root(inputs: &[&str], modified: &str) {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let hashed_address = H256::from_low_u64_be(1);
let mut hashed_storage_cursor =
@ -592,7 +592,7 @@ mod tests {
// 3. Calculate the incremental root
let mut storage_changes = PrefixSet::default();
storage_changes.insert(Nibbles::unpack(modified_key));
let loader = StorageRoot::new_hashed(tx.tx_mut(), hashed_address)
let loader = StorageRoot::new_hashed(tx.tx_ref(), hashed_address)
.with_changed_prefixes(storage_changes);
let incremental_root = loader.root().unwrap();
@ -633,8 +633,8 @@ mod tests {
}
tx.commit().unwrap();
let mut tx = factory.provider_rw().unwrap();
let got = StorageRoot::new(tx.tx_mut(), address).root().unwrap();
let tx = factory.provider_rw().unwrap();
let got = StorageRoot::new(tx.tx_ref(), address).root().unwrap();
let expected = storage_root(storage.into_iter());
assert_eq!(expected, got);
});
@ -681,7 +681,7 @@ mod tests {
fn test_empty_storage_root() {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let address = Address::random();
let code = "el buen fla";
@ -690,11 +690,11 @@ mod tests {
balance: U256::from(414241124u32),
bytecode_hash: Some(keccak256(code)),
};
insert_account(tx.tx_mut(), address, account, &Default::default());
insert_account(tx.tx_ref(), address, account, &Default::default());
tx.commit().unwrap();
let mut tx = factory.provider_rw().unwrap();
let got = StorageRoot::new(tx.tx_mut(), address).root().unwrap();
let tx = factory.provider_rw().unwrap();
let got = StorageRoot::new(tx.tx_ref(), address).root().unwrap();
assert_eq!(got, EMPTY_ROOT);
}
@ -703,7 +703,7 @@ mod tests {
fn test_storage_root() {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let address = Address::random();
let storage = BTreeMap::from([
@ -718,11 +718,11 @@ mod tests {
bytecode_hash: Some(keccak256(code)),
};
insert_account(tx.tx_mut(), address, account, &storage);
insert_account(tx.tx_ref(), address, account, &storage);
tx.commit().unwrap();
let mut tx = factory.provider_rw().unwrap();
let got = StorageRoot::new(tx.tx_mut(), address).root().unwrap();
let tx = factory.provider_rw().unwrap();
let got = StorageRoot::new(tx.tx_ref(), address).root().unwrap();
assert_eq!(storage_root(storage.into_iter()), got);
}
@ -747,13 +747,13 @@ mod tests {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
for (address, (account, storage)) in &state {
insert_account(tx.tx_mut(), *address, *account, storage)
insert_account(tx.tx_ref(), *address, *account, storage)
}
tx.commit().unwrap();
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let expected = state_root(state.into_iter());
@ -763,7 +763,7 @@ mod tests {
let mut intermediate_state: Option<Box<IntermediateStateRootState>> = None;
while got.is_none() {
let calculator = StateRoot::new(tx.tx_mut())
let calculator = StateRoot::new(tx.tx_ref())
.with_threshold(threshold)
.with_intermediate_state(intermediate_state.take().map(|state| *state));
match calculator.root_with_progress().unwrap() {
@ -786,16 +786,16 @@ mod tests {
fn test_state_root_with_state(state: State) {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
for (address, (account, storage)) in &state {
insert_account(tx.tx_mut(), *address, *account, storage)
insert_account(tx.tx_ref(), *address, *account, storage)
}
tx.commit().unwrap();
let expected = state_root(state.into_iter());
let mut tx = factory.provider_rw().unwrap();
let got = StateRoot::new(tx.tx_mut()).root().unwrap();
let tx = factory.provider_rw().unwrap();
let got = StateRoot::new(tx.tx_ref()).root().unwrap();
assert_eq!(expected, got);
}
@ -836,9 +836,9 @@ mod tests {
hashed_storage_cursor.upsert(key3, StorageEntry { key: hashed_slot, value }).unwrap();
}
tx.commit().unwrap();
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let account3_storage_root = StorageRoot::new(tx.tx_mut(), address3).root().unwrap();
let account3_storage_root = StorageRoot::new(tx.tx_ref(), address3).root().unwrap();
let expected_root = storage_root_prehashed(storage.into_iter());
assert_eq!(expected_root, account3_storage_root);
}
@ -858,7 +858,7 @@ mod tests {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let mut hashed_account_cursor =
tx.tx_ref().cursor_write::<tables::HashedAccount>().unwrap();
@ -906,7 +906,7 @@ mod tests {
}
hashed_storage_cursor.upsert(key3, StorageEntry { key: hashed_slot, value }).unwrap();
}
let account3_storage_root = StorageRoot::new(tx.tx_mut(), address3).root().unwrap();
let account3_storage_root = StorageRoot::new(tx.tx_ref(), address3).root().unwrap();
hash_builder.add_leaf(
Nibbles::unpack(key3),
&encode_account(account3, Some(account3_storage_root)),
@ -1105,7 +1105,7 @@ mod tests {
drop(tx);
}
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
{
let mut hashed_account_cursor =
tx.tx_ref().cursor_write::<tables::HashedAccount>().unwrap();
@ -1129,7 +1129,7 @@ mod tests {
(key6, encode_account(account6, None)),
]);
let (root, trie_updates) = StateRoot::new(tx.tx_mut())
let (root, trie_updates) = StateRoot::new(tx.tx_ref())
.with_changed_account_prefixes(account_prefix_set)
.root_with_updates()
.unwrap();
@ -1165,11 +1165,11 @@ mod tests {
fn account_trie_around_extension_node() {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let expected = extension_node_trie(&mut tx);
let expected = extension_node_trie(&tx);
let (got, updates) = StateRoot::new(tx.tx_mut()).root_with_updates().unwrap();
let (got, updates) = StateRoot::new(tx.tx_ref()).root_with_updates().unwrap();
assert_eq!(expected, got);
// Check account trie
@ -1191,13 +1191,13 @@ mod tests {
fn account_trie_around_extension_node_with_dbtrie() {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let expected = extension_node_trie(&mut tx);
let expected = extension_node_trie(&tx);
let (got, updates) = StateRoot::new(tx.tx_mut()).root_with_updates().unwrap();
let (got, updates) = StateRoot::new(tx.tx_ref()).root_with_updates().unwrap();
assert_eq!(expected, got);
updates.flush(tx.tx_mut()).unwrap();
updates.flush(tx.tx_ref()).unwrap();
// read the account updates from the db
let mut accounts_trie = tx.tx_ref().cursor_read::<tables::AccountsTrie>().unwrap();
@ -1219,7 +1219,7 @@ mod tests {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let mut hashed_account_cursor = tx.tx_ref().cursor_write::<tables::HashedAccount>().unwrap();
let mut state = BTreeMap::default();
@ -1233,7 +1233,7 @@ mod tests {
}
}
let (state_root, trie_updates) = StateRoot::new(tx.tx_mut())
let (state_root, trie_updates) = StateRoot::new(tx.tx_ref())
.with_changed_account_prefixes(changes)
.root_with_updates()
.unwrap();
@ -1243,7 +1243,7 @@ mod tests {
state.clone().into_iter().map(|(key, balance)| (key, (Account { balance, ..Default::default() }, std::iter::empty())))
);
assert_eq!(expected_root, state_root);
trie_updates.flush(tx.tx_mut()).unwrap();
trie_updates.flush(tx.tx_ref()).unwrap();
}
});
}
@ -1253,14 +1253,13 @@ mod tests {
fn storage_trie_around_extension_node() {
let db = create_test_rw_db();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let mut tx = factory.provider_rw().unwrap();
let tx = factory.provider_rw().unwrap();
let hashed_address = H256::random();
let (expected_root, expected_updates) =
extension_node_storage_trie(&mut tx, hashed_address);
let (expected_root, expected_updates) = extension_node_storage_trie(&tx, hashed_address);
let (got, _, updates) =
StorageRoot::new_hashed(tx.tx_mut(), hashed_address).root_with_updates().unwrap();
StorageRoot::new_hashed(tx.tx_ref(), hashed_address).root_with_updates().unwrap();
assert_eq!(expected_root, got);
// Check account trie
@ -1279,7 +1278,7 @@ mod tests {
}
fn extension_node_storage_trie(
tx: &mut DatabaseProviderRW<'_, &Env<WriteMap>>,
tx: &DatabaseProviderRW<'_, &Env<WriteMap>>,
hashed_address: H256,
) -> (H256, HashMap<Nibbles, BranchNodeCompact>) {
let value = U256::from(1);
@ -1305,7 +1304,7 @@ mod tests {
(root, updates)
}
fn extension_node_trie(tx: &mut DatabaseProviderRW<'_, &Env<WriteMap>>) -> H256 {
fn extension_node_trie(tx: &DatabaseProviderRW<'_, &Env<WriteMap>>) -> H256 {
let a =
Account { nonce: 0, balance: U256::from(1u64), bytecode_hash: Some(H256::random()) };
let val = encode_account(a, None);

View File

@ -6,7 +6,8 @@ use crate::{
};
use reth_db::mdbx::test_utils::create_test_rw_db;
use reth_primitives::{BlockBody, SealedBlock};
use reth_provider::ProviderFactory;
use reth_provider::{BlockWriter, ProviderFactory};
use reth_rlp::Decodable;
use reth_stages::{stages::ExecutionStage, ExecInput, Stage};
use std::{collections::BTreeMap, ffi::OsStr, fs, path::Path, sync::Arc};
@ -79,8 +80,7 @@ impl Case for BlockchainTestCase {
let mut provider = factory.provider_rw().unwrap();
// Insert test state
reth_provider::insert_canonical_block(
provider.tx_ref(),
provider.insert_block(
SealedBlock::new(case.genesis_block_header.clone().into(), BlockBody::default()),
None,
)?;
@ -88,7 +88,9 @@ impl Case for BlockchainTestCase {
let mut last_block = None;
for block in case.blocks.iter() {
last_block = Some(block.write_to_db(provider.tx_ref())?);
let decoded = SealedBlock::decode(&mut block.rlp.as_ref())?;
last_block = Some(decoded.number);
provider.insert_block(decoded, None)?;
}
// Call execution stage

View File

@ -7,11 +7,10 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{
keccak256, Account as RethAccount, Address, BigEndianHash, BlockNumber, Bloom, Bytecode, Bytes,
ChainSpec, ChainSpecBuilder, Header as RethHeader, JsonU256, SealedBlock, SealedHeader,
StorageEntry, Withdrawal, H160, H256, H64, U256,
keccak256, Account as RethAccount, Address, BigEndianHash, Bloom, Bytecode, Bytes, ChainSpec,
ChainSpecBuilder, Header as RethHeader, JsonU256, SealedHeader, StorageEntry, Withdrawal, H160,
H256, H64, U256,
};
use reth_rlp::Decodable;
use serde::{self, Deserialize};
use std::{collections::BTreeMap, ops::Deref};
@ -138,19 +137,6 @@ pub struct Block {
pub withdrawals: Option<Vec<Withdrawal>>,
}
impl Block {
/// Write the block to the database.
pub fn write_to_db<'a, Tx>(&self, tx: &'a Tx) -> Result<BlockNumber, Error>
where
Tx: DbTx<'a> + DbTxMut<'a>,
{
let decoded = SealedBlock::decode(&mut self.rlp.as_ref())?;
let block_number = decoded.number;
reth_provider::insert_canonical_block(tx, decoded, None)?;
Ok(block_number)
}
}
/// Transaction sequence in block
#[derive(Debug, PartialEq, Eq, Deserialize)]
#[serde(deny_unknown_fields)]