From 78426739036e7599184f49ccf486068e1429285c Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 7 Oct 2024 11:06:28 +0200 Subject: [PATCH] feat(provider): add `test_race` to `BlockchainProvider2` tests (#11523) --- Cargo.lock | 1 + crates/storage/db/Cargo.toml | 4 +- crates/storage/db/src/lib.rs | 46 +++++- .../src/providers/blockchain_provider.rs | 148 +++++++++++++++++- 4 files changed, 188 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 590f01a41..a66887900 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6702,6 +6702,7 @@ dependencies = [ "iai-callgrind", "metrics", "page_size", + "parking_lot 0.12.3", "paste", "pprof", "proptest", diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index ba012cf68..a075f7724 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -50,6 +50,7 @@ derive_more.workspace = true paste.workspace = true rustc-hash = { workspace = true, optional = true } sysinfo = { version = "0.31", default-features = false, features = ["system"] } +parking_lot = { workspace = true, optional = true } # arbitrary utils strum = { workspace = true, features = ["derive"], optional = true } @@ -61,6 +62,7 @@ rand.workspace = true serde_json.workspace = true tempfile.workspace = true test-fuzz.workspace = true +parking_lot.workspace = true pprof = { workspace = true, features = [ "flamegraph", @@ -88,7 +90,7 @@ mdbx = [ "dep:strum", "dep:rustc-hash", ] -test-utils = ["dep:tempfile", "arbitrary"] +test-utils = ["dep:tempfile", "arbitrary", "parking_lot"] bench = [] arbitrary = ["reth-primitives/arbitrary", "reth-db-api/arbitrary"] optimism = [] diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index a9f073d7b..7090b4262 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -44,6 +44,7 @@ pub use reth_db_api::*; pub mod test_utils { use super::*; use crate::mdbx::DatabaseArguments; + use parking_lot::RwLock; use reth_db_api::{ database::Database, database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics}, @@ -52,6 +53,7 @@ pub mod test_utils { use reth_fs_util; use reth_libmdbx::MaxReadTransactionDuration; use std::{ + fmt::Formatter, path::{Path, PathBuf}, sync::Arc, }; @@ -69,10 +71,19 @@ pub mod test_utils { pub const ERROR_TEMPDIR: &str = "Not able to create a temporary directory."; /// A database will delete the db dir when dropped. - #[derive(Debug)] pub struct TempDatabase { db: Option, path: PathBuf, + /// Executed right before a database transaction is created. + pre_tx_hook: RwLock>, + /// Executed right after a database transaction is created. + post_tx_hook: RwLock>, + } + + impl std::fmt::Debug for TempDatabase { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TempDatabase").field("db", &self.db).field("path", &self.path).finish() + } } impl Drop for TempDatabase { @@ -85,6 +96,16 @@ pub mod test_utils { } impl TempDatabase { + /// Create new [`TempDatabase`] instance. + pub fn new(db: DB, path: PathBuf) -> Self { + Self { + db: Some(db), + path, + pre_tx_hook: RwLock::new(Box::new(|| ())), + post_tx_hook: RwLock::new(Box::new(|| ())), + } + } + /// Returns the reference to inner db. pub fn db(&self) -> &DB { self.db.as_ref().unwrap() @@ -99,13 +120,28 @@ pub mod test_utils { pub fn into_inner_db(mut self) -> DB { self.db.take().unwrap() // take out db to avoid clean path in drop fn } + + /// Sets [`TempDatabase`] new pre transaction creation hook. + pub fn set_pre_transaction_hook(&self, hook: Box) { + let mut db_hook = self.pre_tx_hook.write(); + *db_hook = hook; + } + + /// Sets [`TempDatabase`] new post transaction creation hook. + pub fn set_post_transaction_hook(&self, hook: Box) { + let mut db_hook = self.post_tx_hook.write(); + *db_hook = hook; + } } impl Database for TempDatabase { type TX = ::TX; type TXMut = ::TXMut; fn tx(&self) -> Result { - self.db().tx() + self.pre_tx_hook.read()(); + let tx = self.db().tx()?; + self.post_tx_hook.read()(); + Ok(tx) } fn tx_mut(&self) -> Result { @@ -150,7 +186,7 @@ pub mod test_utils { ) .expect(&emsg); - Arc::new(TempDatabase { db: Some(db), path }) + Arc::new(TempDatabase::new(db, path)) } /// Create read/write database for testing @@ -162,7 +198,7 @@ pub mod test_utils { .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)), ) .expect(ERROR_DB_CREATION); - Arc::new(TempDatabase { db: Some(db), path }) + Arc::new(TempDatabase::new(db, path)) } /// Create read only database for testing @@ -175,7 +211,7 @@ pub mod test_utils { init_db(path.as_path(), args.clone()).expect(ERROR_DB_CREATION); } let db = open_db_read_only(path.as_path(), args).expect(ERROR_DB_OPEN); - Arc::new(TempDatabase { db: Some(db), path }) + Arc::new(TempDatabase::new(db, path)) } } diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 9a2a3d0a7..fdd31cd84 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -1486,20 +1486,26 @@ mod tests { MockNodeTypesWithDB, }, writer::UnifiedStorageWriter, - BlockWriter, CanonChainTracker, StaticFileProviderFactory, StaticFileWriter, + BlockWriter, CanonChainTracker, ProviderFactory, StaticFileProviderFactory, + StaticFileWriter, }; use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag}; - use alloy_primitives::B256; + use alloy_primitives::{BlockNumber, B256}; use itertools::Itertools; use rand::Rng; use reth_chain_state::{ test_utils::TestBlockBuilder, CanonStateNotification, CanonStateSubscriptions, - ExecutedBlock, NewCanonicalChain, + CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain, }; use reth_chainspec::{ ChainSpec, ChainSpecBuilder, ChainSpecProvider, EthereumHardfork, MAINNET, }; - use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices}; + use reth_db::{ + models::{AccountBeforeTx, StoredBlockBodyIndices}, + tables, + }; + use reth_db_api::{cursor::DbCursorRO, transaction::DbTx}; + use reth_errors::ProviderError; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{ Receipt, SealedBlock, StaticFileSegment, TransactionMeta, TransactionSignedNoHash, @@ -1586,9 +1592,27 @@ mod tests { let factory = create_test_provider_factory_with_chain_spec(chain_spec); let provider_rw = factory.database_provider_rw()?; + let static_file_provider = factory.static_file_provider(); + + // Write transactions to static files with the right `tx_num`` + let mut bodies_cursor = provider_rw.tx_ref().cursor_read::()?; + let mut tx_num = bodies_cursor + .seek_exact(database_blocks.first().as_ref().unwrap().number.saturating_sub(1))? + .map(|(_, indices)| indices.next_tx_num()) + .unwrap_or_default(); // Insert blocks into the database for block in &database_blocks { + // TODO: this should be moved inside `insert_historical_block`: + let mut transactions_writer = + static_file_provider.latest_writer(StaticFileSegment::Transactions)?; + transactions_writer.increment_block(block.number)?; + for tx in block.body.transactions() { + let tx: TransactionSignedNoHash = tx.clone().into(); + transactions_writer.append_transaction(tx_num, &tx)?; + tx_num += 1; + } + provider_rw.insert_historical_block( block.clone().seal_with_senders().expect("failed to seal block with senders"), )?; @@ -1602,7 +1626,9 @@ mod tests { .append_receipts_from_blocks( // The initial block number is required database_blocks.first().map(|b| b.number).unwrap_or_default(), - receipts.iter().map(|vec| vec.clone().into_iter().map(Some).collect::>()), + receipts[..database_blocks.len()] + .iter() + .map(|vec| vec.clone().into_iter().map(Some).collect::>()), )?; // Commit to both storages: database and static files @@ -1668,6 +1694,42 @@ mod tests { ) } + /// This will persist the last block in-memory and delete it from + /// `canonical_in_memory_state` right after a database read transaction is created. + /// + /// This simulates a RPC method having a different view than when its database transaction was + /// created. + fn persist_block_after_db_tx_creation( + provider: Arc>, + block_number: BlockNumber, + ) { + let hook_provider = provider.clone(); + provider.database.db_ref().set_post_transaction_hook(Box::new(move || { + if let Some(state) = hook_provider.canonical_in_memory_state.head_state() { + if state.anchor().number + 1 == block_number { + let mut lowest_memory_block = + state.parent_state_chain().last().expect("qed").block(); + let num_hash = lowest_memory_block.block().num_hash(); + + let mut execution_output = (*lowest_memory_block.execution_output).clone(); + execution_output.first_block = lowest_memory_block.block().number; + lowest_memory_block.execution_output = Arc::new(execution_output); + + // Push to disk + let provider_rw = hook_provider.database_provider_rw().unwrap(); + UnifiedStorageWriter::from(&provider_rw, &hook_provider.static_file_provider()) + .save_blocks(&[lowest_memory_block]) + .unwrap(); + UnifiedStorageWriter::commit(provider_rw, hook_provider.static_file_provider()) + .unwrap(); + + // Remove from memory + hook_provider.canonical_in_memory_state.remove_persisted_blocks(num_hash); + } + } + })); + } + #[test] fn test_block_reader_find_block_by_hash() -> eyre::Result<()> { // Initialize random number generator and provider factory @@ -3908,4 +3970,80 @@ mod tests { Ok(()) } + + #[test] + fn test_race() -> eyre::Result<()> { + let mut rng = generators::rng(); + let (provider, _, in_memory_blocks, _) = provider_with_random_blocks( + &mut rng, + TEST_BLOCKS_COUNT - 1, + TEST_BLOCKS_COUNT + 1, + BlockRangeParams { + tx_count: TEST_TRANSACTIONS_COUNT..TEST_TRANSACTIONS_COUNT, + ..Default::default() + }, + )?; + + let provider = Arc::new(provider); + + // Old implementation was querying the database first. This is problematic, if there are + // changes AFTER the database transaction is created. + let old_transaction_hash_fn = + |hash: B256, + canonical_in_memory_state: CanonicalInMemoryState, + factory: ProviderFactory| { + assert!(factory.transaction_by_hash(hash)?.is_none(), "should not be in database"); + Ok::<_, ProviderError>(canonical_in_memory_state.transaction_by_hash(hash)) + }; + + // Correct implementation queries in-memory first + let correct_transaction_hash_fn = + |hash: B256, + canonical_in_memory_state: CanonicalInMemoryState, + _factory: ProviderFactory| { + if let Some(tx) = canonical_in_memory_state.transaction_by_hash(hash) { + return Ok::<_, ProviderError>(Some(tx)) + } + panic!("should not be in database"); + // _factory.transaction_by_hash(hash) + }; + + // OLD BEHAVIOUR + { + // This will persist block 1 AFTER a database is created. Moving it from memory to + // storage. + persist_block_after_db_tx_creation(provider.clone(), in_memory_blocks[0].number); + let to_be_persisted_tx = in_memory_blocks[0].body.transactions[0].clone(); + + // Even though the block exists, given the order of provider queries done in the method + // above, we do not see it. + assert_eq!( + old_transaction_hash_fn( + to_be_persisted_tx.hash(), + provider.canonical_in_memory_state(), + provider.database.clone() + ), + Ok(None) + ); + } + + // CORRECT BEHAVIOUR + { + // This will persist block 1 AFTER a database is created. Moving it from memory to + // storage. + persist_block_after_db_tx_creation(provider.clone(), in_memory_blocks[1].number); + let to_be_persisted_tx = in_memory_blocks[1].body.transactions[0].clone(); + + assert_eq!( + correct_transaction_hash_fn( + to_be_persisted_tx.hash(), + provider.canonical_in_memory_state(), + provider.database.clone() + ), + Ok(Some(to_be_persisted_tx)) + ); + } + + Ok(()) + } }