feat(provider): add test_race to BlockchainProvider2 tests (#11523)

This commit is contained in:
joshieDo
2024-10-07 11:06:28 +02:00
committed by GitHub
parent d2ab6352c6
commit 7842673903
4 changed files with 188 additions and 11 deletions

1
Cargo.lock generated
View File

@ -6702,6 +6702,7 @@ dependencies = [
"iai-callgrind", "iai-callgrind",
"metrics", "metrics",
"page_size", "page_size",
"parking_lot 0.12.3",
"paste", "paste",
"pprof", "pprof",
"proptest", "proptest",

View File

@ -50,6 +50,7 @@ derive_more.workspace = true
paste.workspace = true paste.workspace = true
rustc-hash = { workspace = true, optional = true } rustc-hash = { workspace = true, optional = true }
sysinfo = { version = "0.31", default-features = false, features = ["system"] } sysinfo = { version = "0.31", default-features = false, features = ["system"] }
parking_lot = { workspace = true, optional = true }
# arbitrary utils # arbitrary utils
strum = { workspace = true, features = ["derive"], optional = true } strum = { workspace = true, features = ["derive"], optional = true }
@ -61,6 +62,7 @@ rand.workspace = true
serde_json.workspace = true serde_json.workspace = true
tempfile.workspace = true tempfile.workspace = true
test-fuzz.workspace = true test-fuzz.workspace = true
parking_lot.workspace = true
pprof = { workspace = true, features = [ pprof = { workspace = true, features = [
"flamegraph", "flamegraph",
@ -88,7 +90,7 @@ mdbx = [
"dep:strum", "dep:strum",
"dep:rustc-hash", "dep:rustc-hash",
] ]
test-utils = ["dep:tempfile", "arbitrary"] test-utils = ["dep:tempfile", "arbitrary", "parking_lot"]
bench = [] bench = []
arbitrary = ["reth-primitives/arbitrary", "reth-db-api/arbitrary"] arbitrary = ["reth-primitives/arbitrary", "reth-db-api/arbitrary"]
optimism = [] optimism = []

View File

@ -44,6 +44,7 @@ pub use reth_db_api::*;
pub mod test_utils { pub mod test_utils {
use super::*; use super::*;
use crate::mdbx::DatabaseArguments; use crate::mdbx::DatabaseArguments;
use parking_lot::RwLock;
use reth_db_api::{ use reth_db_api::{
database::Database, database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics}, database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
@ -52,6 +53,7 @@ pub mod test_utils {
use reth_fs_util; use reth_fs_util;
use reth_libmdbx::MaxReadTransactionDuration; use reth_libmdbx::MaxReadTransactionDuration;
use std::{ use std::{
fmt::Formatter,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
@ -69,10 +71,19 @@ pub mod test_utils {
pub const ERROR_TEMPDIR: &str = "Not able to create a temporary directory."; pub const ERROR_TEMPDIR: &str = "Not able to create a temporary directory.";
/// A database will delete the db dir when dropped. /// A database will delete the db dir when dropped.
#[derive(Debug)]
pub struct TempDatabase<DB> { pub struct TempDatabase<DB> {
db: Option<DB>, db: Option<DB>,
path: PathBuf, path: PathBuf,
/// Executed right before a database transaction is created.
pre_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
/// Executed right after a database transaction is created.
post_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
}
impl<DB: std::fmt::Debug> std::fmt::Debug for TempDatabase<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TempDatabase").field("db", &self.db).field("path", &self.path).finish()
}
} }
impl<DB> Drop for TempDatabase<DB> { impl<DB> Drop for TempDatabase<DB> {
@ -85,6 +96,16 @@ pub mod test_utils {
} }
impl<DB> TempDatabase<DB> { impl<DB> TempDatabase<DB> {
/// Create new [`TempDatabase`] instance.
pub fn new(db: DB, path: PathBuf) -> Self {
Self {
db: Some(db),
path,
pre_tx_hook: RwLock::new(Box::new(|| ())),
post_tx_hook: RwLock::new(Box::new(|| ())),
}
}
/// Returns the reference to inner db. /// Returns the reference to inner db.
pub fn db(&self) -> &DB { pub fn db(&self) -> &DB {
self.db.as_ref().unwrap() self.db.as_ref().unwrap()
@ -99,13 +120,28 @@ pub mod test_utils {
pub fn into_inner_db(mut self) -> DB { pub fn into_inner_db(mut self) -> DB {
self.db.take().unwrap() // take out db to avoid clean path in drop fn self.db.take().unwrap() // take out db to avoid clean path in drop fn
} }
/// Sets [`TempDatabase`] new pre transaction creation hook.
pub fn set_pre_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
let mut db_hook = self.pre_tx_hook.write();
*db_hook = hook;
}
/// Sets [`TempDatabase`] new post transaction creation hook.
pub fn set_post_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
let mut db_hook = self.post_tx_hook.write();
*db_hook = hook;
}
} }
impl<DB: Database> Database for TempDatabase<DB> { impl<DB: Database> Database for TempDatabase<DB> {
type TX = <DB as Database>::TX; type TX = <DB as Database>::TX;
type TXMut = <DB as Database>::TXMut; type TXMut = <DB as Database>::TXMut;
fn tx(&self) -> Result<Self::TX, DatabaseError> { fn tx(&self) -> Result<Self::TX, DatabaseError> {
self.db().tx() self.pre_tx_hook.read()();
let tx = self.db().tx()?;
self.post_tx_hook.read()();
Ok(tx)
} }
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> { fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
@ -150,7 +186,7 @@ pub mod test_utils {
) )
.expect(&emsg); .expect(&emsg);
Arc::new(TempDatabase { db: Some(db), path }) Arc::new(TempDatabase::new(db, path))
} }
/// Create read/write database for testing /// Create read/write database for testing
@ -162,7 +198,7 @@ pub mod test_utils {
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)), .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
) )
.expect(ERROR_DB_CREATION); .expect(ERROR_DB_CREATION);
Arc::new(TempDatabase { db: Some(db), path }) Arc::new(TempDatabase::new(db, path))
} }
/// Create read only database for testing /// Create read only database for testing
@ -175,7 +211,7 @@ pub mod test_utils {
init_db(path.as_path(), args.clone()).expect(ERROR_DB_CREATION); init_db(path.as_path(), args.clone()).expect(ERROR_DB_CREATION);
} }
let db = open_db_read_only(path.as_path(), args).expect(ERROR_DB_OPEN); let db = open_db_read_only(path.as_path(), args).expect(ERROR_DB_OPEN);
Arc::new(TempDatabase { db: Some(db), path }) Arc::new(TempDatabase::new(db, path))
} }
} }

View File

@ -1486,20 +1486,26 @@ mod tests {
MockNodeTypesWithDB, MockNodeTypesWithDB,
}, },
writer::UnifiedStorageWriter, writer::UnifiedStorageWriter,
BlockWriter, CanonChainTracker, StaticFileProviderFactory, StaticFileWriter, BlockWriter, CanonChainTracker, ProviderFactory, StaticFileProviderFactory,
StaticFileWriter,
}; };
use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag}; use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag};
use alloy_primitives::B256; use alloy_primitives::{BlockNumber, B256};
use itertools::Itertools; use itertools::Itertools;
use rand::Rng; use rand::Rng;
use reth_chain_state::{ use reth_chain_state::{
test_utils::TestBlockBuilder, CanonStateNotification, CanonStateSubscriptions, test_utils::TestBlockBuilder, CanonStateNotification, CanonStateSubscriptions,
ExecutedBlock, NewCanonicalChain, CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain,
}; };
use reth_chainspec::{ use reth_chainspec::{
ChainSpec, ChainSpecBuilder, ChainSpecProvider, EthereumHardfork, MAINNET, ChainSpec, ChainSpecBuilder, ChainSpecProvider, EthereumHardfork, MAINNET,
}; };
use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_db::{
models::{AccountBeforeTx, StoredBlockBodyIndices},
tables,
};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_errors::ProviderError;
use reth_execution_types::{Chain, ExecutionOutcome}; use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{ use reth_primitives::{
Receipt, SealedBlock, StaticFileSegment, TransactionMeta, TransactionSignedNoHash, Receipt, SealedBlock, StaticFileSegment, TransactionMeta, TransactionSignedNoHash,
@ -1586,9 +1592,27 @@ mod tests {
let factory = create_test_provider_factory_with_chain_spec(chain_spec); let factory = create_test_provider_factory_with_chain_spec(chain_spec);
let provider_rw = factory.database_provider_rw()?; let provider_rw = factory.database_provider_rw()?;
let static_file_provider = factory.static_file_provider();
// Write transactions to static files with the right `tx_num``
let mut bodies_cursor = provider_rw.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
let mut tx_num = bodies_cursor
.seek_exact(database_blocks.first().as_ref().unwrap().number.saturating_sub(1))?
.map(|(_, indices)| indices.next_tx_num())
.unwrap_or_default();
// Insert blocks into the database // Insert blocks into the database
for block in &database_blocks { for block in &database_blocks {
// TODO: this should be moved inside `insert_historical_block`: <https://github.com/paradigmxyz/reth/issues/11524>
let mut transactions_writer =
static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
transactions_writer.increment_block(block.number)?;
for tx in block.body.transactions() {
let tx: TransactionSignedNoHash = tx.clone().into();
transactions_writer.append_transaction(tx_num, &tx)?;
tx_num += 1;
}
provider_rw.insert_historical_block( provider_rw.insert_historical_block(
block.clone().seal_with_senders().expect("failed to seal block with senders"), block.clone().seal_with_senders().expect("failed to seal block with senders"),
)?; )?;
@ -1602,7 +1626,9 @@ mod tests {
.append_receipts_from_blocks( .append_receipts_from_blocks(
// The initial block number is required // The initial block number is required
database_blocks.first().map(|b| b.number).unwrap_or_default(), database_blocks.first().map(|b| b.number).unwrap_or_default(),
receipts.iter().map(|vec| vec.clone().into_iter().map(Some).collect::<Vec<_>>()), receipts[..database_blocks.len()]
.iter()
.map(|vec| vec.clone().into_iter().map(Some).collect::<Vec<_>>()),
)?; )?;
// Commit to both storages: database and static files // Commit to both storages: database and static files
@ -1668,6 +1694,42 @@ mod tests {
) )
} }
/// This will persist the last block in-memory and delete it from
/// `canonical_in_memory_state` right after a database read transaction is created.
///
/// This simulates a RPC method having a different view than when its database transaction was
/// created.
fn persist_block_after_db_tx_creation(
provider: Arc<BlockchainProvider2<MockNodeTypesWithDB>>,
block_number: BlockNumber,
) {
let hook_provider = provider.clone();
provider.database.db_ref().set_post_transaction_hook(Box::new(move || {
if let Some(state) = hook_provider.canonical_in_memory_state.head_state() {
if state.anchor().number + 1 == block_number {
let mut lowest_memory_block =
state.parent_state_chain().last().expect("qed").block();
let num_hash = lowest_memory_block.block().num_hash();
let mut execution_output = (*lowest_memory_block.execution_output).clone();
execution_output.first_block = lowest_memory_block.block().number;
lowest_memory_block.execution_output = Arc::new(execution_output);
// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
UnifiedStorageWriter::from(&provider_rw, &hook_provider.static_file_provider())
.save_blocks(&[lowest_memory_block])
.unwrap();
UnifiedStorageWriter::commit(provider_rw, hook_provider.static_file_provider())
.unwrap();
// Remove from memory
hook_provider.canonical_in_memory_state.remove_persisted_blocks(num_hash);
}
}
}));
}
#[test] #[test]
fn test_block_reader_find_block_by_hash() -> eyre::Result<()> { fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
// Initialize random number generator and provider factory // Initialize random number generator and provider factory
@ -3908,4 +3970,80 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn test_race() -> eyre::Result<()> {
let mut rng = generators::rng();
let (provider, _, in_memory_blocks, _) = provider_with_random_blocks(
&mut rng,
TEST_BLOCKS_COUNT - 1,
TEST_BLOCKS_COUNT + 1,
BlockRangeParams {
tx_count: TEST_TRANSACTIONS_COUNT..TEST_TRANSACTIONS_COUNT,
..Default::default()
},
)?;
let provider = Arc::new(provider);
// Old implementation was querying the database first. This is problematic, if there are
// changes AFTER the database transaction is created.
let old_transaction_hash_fn =
|hash: B256,
canonical_in_memory_state: CanonicalInMemoryState,
factory: ProviderFactory<MockNodeTypesWithDB>| {
assert!(factory.transaction_by_hash(hash)?.is_none(), "should not be in database");
Ok::<_, ProviderError>(canonical_in_memory_state.transaction_by_hash(hash))
};
// Correct implementation queries in-memory first
let correct_transaction_hash_fn =
|hash: B256,
canonical_in_memory_state: CanonicalInMemoryState,
_factory: ProviderFactory<MockNodeTypesWithDB>| {
if let Some(tx) = canonical_in_memory_state.transaction_by_hash(hash) {
return Ok::<_, ProviderError>(Some(tx))
}
panic!("should not be in database");
// _factory.transaction_by_hash(hash)
};
// OLD BEHAVIOUR
{
// This will persist block 1 AFTER a database is created. Moving it from memory to
// storage.
persist_block_after_db_tx_creation(provider.clone(), in_memory_blocks[0].number);
let to_be_persisted_tx = in_memory_blocks[0].body.transactions[0].clone();
// Even though the block exists, given the order of provider queries done in the method
// above, we do not see it.
assert_eq!(
old_transaction_hash_fn(
to_be_persisted_tx.hash(),
provider.canonical_in_memory_state(),
provider.database.clone()
),
Ok(None)
);
}
// CORRECT BEHAVIOUR
{
// This will persist block 1 AFTER a database is created. Moving it from memory to
// storage.
persist_block_after_db_tx_creation(provider.clone(), in_memory_blocks[1].number);
let to_be_persisted_tx = in_memory_blocks[1].body.transactions[0].clone();
assert_eq!(
correct_transaction_hash_fn(
to_be_persisted_tx.hash(),
provider.canonical_in_memory_state(),
provider.database.clone()
),
Ok(Some(to_be_persisted_tx))
);
}
Ok(())
}
} }