From cf72b6f38d03a6f8bae5360329a9ceb3d8474fb5 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 7 Nov 2024 21:06:53 +0900 Subject: [PATCH] chore: move helper methods from `DatabaseProvider` to `DBProvider` as defaults (#12367) --- crates/blockchain-tree/src/blockchain_tree.rs | 4 +- crates/blockchain-tree/src/chain.rs | 3 +- crates/cli/commands/src/db/checksum.rs | 2 +- crates/storage/db-common/src/db_tool/mod.rs | 2 +- .../provider/src/providers/database/mod.rs | 3 +- .../src/providers/database/provider.rs | 166 +++--------------- .../storage/provider/src/test_utils/blocks.rs | 2 +- .../storage-api/src/database_provider.rs | 119 ++++++++++++- 8 files changed, 153 insertions(+), 148 deletions(-) diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 65e55d7d9..64705e5cc 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -23,8 +23,8 @@ use reth_primitives::{ use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, BlockWriter, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, - ChainSpecProvider, ChainSplit, ChainSplitTarget, DisplayBlocksChain, HeaderProvider, - ProviderError, StaticFileProviderFactory, + ChainSpecProvider, ChainSplit, ChainSplitTarget, DBProvider, DisplayBlocksChain, + HeaderProvider, ProviderError, StaticFileProviderFactory, }; use reth_stages_api::{MetricEvent, MetricEventsSender}; use reth_storage_errors::provider::{ProviderResult, RootMismatch}; diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index 393e525d5..09ba5c3f8 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -18,7 +18,8 @@ use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{GotExpected, SealedBlockWithSenders, SealedHeader}; use reth_provider::{ providers::{BundleStateProvider, ConsistentDbView, ProviderNodeTypes}, - FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider, + DBProvider, FullExecutionDataProvider, ProviderError, StateRootProvider, + TryIntoHistoricalStateProvider, }; use reth_revm::database::StateProviderDatabase; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; diff --git a/crates/cli/commands/src/db/checksum.rs b/crates/cli/commands/src/db/checksum.rs index 60ec09c96..9aa48e0e8 100644 --- a/crates/cli/commands/src/db/checksum.rs +++ b/crates/cli/commands/src/db/checksum.rs @@ -6,7 +6,7 @@ use reth_db::{DatabaseEnv, RawKey, RawTable, RawValue, TableViewer, Tables}; use reth_db_api::{cursor::DbCursorRO, table::Table, transaction::DbTx}; use reth_db_common::DbTool; use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine}; -use reth_provider::providers::ProviderNodeTypes; +use reth_provider::{providers::ProviderNodeTypes, DBProvider}; use std::{ hash::{BuildHasher, Hasher}, sync::Arc, diff --git a/crates/storage/db-common/src/db_tool/mod.rs b/crates/storage/db-common/src/db_tool/mod.rs index 67a5dd627..3420f2089 100644 --- a/crates/storage/db-common/src/db_tool/mod.rs +++ b/crates/storage/db-common/src/db_tool/mod.rs @@ -12,7 +12,7 @@ use reth_db_api::{ }; use reth_fs_util as fs; use reth_node_types::NodeTypesWithDB; -use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory}; +use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, DBProvider, ProviderFactory}; use std::{path::Path, rc::Rc, sync::Arc}; use tracing::info; diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index bd0466ff9..38918f52c 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -621,7 +621,8 @@ mod tests { use crate::{ providers::{StaticFileProvider, StaticFileWriter}, test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB}, - BlockHashReader, BlockNumReader, BlockWriter, HeaderSyncGapProvider, TransactionsProvider, + BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider, + TransactionsProvider, }; use alloy_primitives::{TxNumber, B256, U256}; use assert_matches::assert_matches; diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 266f98aae..c76f77572 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -24,7 +24,6 @@ use reth_db::{ cursor::DbDupCursorRW, tables, BlockNumberList, PlainAccountState, PlainStorageState, }; use reth_db_api::{ - common::KeyValue, cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, models::{ @@ -63,7 +62,7 @@ use std::{ cmp::Ordering, collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, fmt::Debug, - ops::{Bound, Deref, DerefMut, Range, RangeBounds, RangeInclusive}, + ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive}, sync::{mpsc, Arc}, time::{Duration, Instant}, }; @@ -145,7 +144,7 @@ impl DatabaseProvider { } } -impl DatabaseProvider { +impl DatabaseProvider { /// State provider for latest block pub fn latest<'a>(&'a self) -> ProviderResult> { trace!(target: "providers::db", "Returning latest state provider"); @@ -364,7 +363,7 @@ where Ok(Vec::new()) } -impl DatabaseProvider { +impl DatabaseProvider { /// Creates a provider with an inner read-only transaction. pub const fn new( tx: TX, @@ -395,75 +394,6 @@ impl DatabaseProvider { &self.chain_spec } - /// Disables long-lived read transaction safety guarantees for leaks prevention and - /// observability improvements. - /// - /// CAUTION: In most of the cases, you want the safety guarantees for long read transactions - /// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning - /// that Reth as a node is offline and not progressing. - pub fn disable_long_read_transaction_safety(mut self) -> Self { - self.tx.disable_long_read_transaction_safety(); - self - } - - /// Return full table as Vec - pub fn table(&self) -> Result>, DatabaseError> - where - T::Key: Default + Ord, - { - self.tx - .cursor_read::()? - .walk(Some(T::Key::default()))? - .collect::, DatabaseError>>() - } - - /// Return a list of entries from the table, based on the given range. - #[inline] - pub fn get( - &self, - range: impl RangeBounds, - ) -> Result>, DatabaseError> { - self.tx.cursor_read::()?.walk_range(range)?.collect::, _>>() - } - - /// Iterates over read only values in the given table and collects them into a vector. - /// - /// Early-returns if the range is empty, without opening a cursor transaction. - fn cursor_read_collect>( - &self, - range: impl RangeBounds, - ) -> ProviderResult> { - let capacity = match range_size_hint(&range) { - Some(0) | None => return Ok(Vec::new()), - Some(capacity) => capacity, - }; - let mut cursor = self.tx.cursor_read::()?; - self.cursor_collect_with_capacity(&mut cursor, range, capacity) - } - - /// Iterates over read only values in the given table and collects them into a vector. - fn cursor_collect>( - &self, - cursor: &mut impl DbCursorRO, - range: impl RangeBounds, - ) -> ProviderResult> { - let capacity = range_size_hint(&range).unwrap_or(0); - self.cursor_collect_with_capacity(cursor, range, capacity) - } - - fn cursor_collect_with_capacity>( - &self, - cursor: &mut impl DbCursorRO, - range: impl RangeBounds, - capacity: usize, - ) -> ProviderResult> { - let mut items = Vec::with_capacity(capacity); - for entry in cursor.walk_range(range)? { - items.push(entry?.1); - } - Ok(items) - } - fn transactions_by_tx_range_with_cursor( &self, range: impl RangeBounds, @@ -852,44 +782,12 @@ impl DatabaseProvider { } } -impl DatabaseProvider { +impl DatabaseProvider { /// Commit database transaction. pub fn commit(self) -> ProviderResult { Ok(self.tx.commit()?) } - /// Remove list of entries from the table. Returns the number of entries removed. - #[inline] - pub fn remove( - &self, - range: impl RangeBounds, - ) -> Result { - let mut entries = 0; - let mut cursor_write = self.tx.cursor_write::()?; - let mut walker = cursor_write.walk_range(range)?; - while walker.next().transpose()?.is_some() { - walker.delete_current()?; - entries += 1; - } - Ok(entries) - } - - /// Return a list of entries from the table, and remove them, based on the given range. - #[inline] - pub fn take( - &self, - range: impl RangeBounds, - ) -> Result>, DatabaseError> { - let mut cursor_write = self.tx.cursor_write::()?; - let mut walker = cursor_write.walk_range(range)?; - let mut items = Vec::new(); - while let Some(i) = walker.next().transpose()? { - walker.delete_current()?; - items.push(i) - } - Ok(items) - } - /// Remove requested block transactions, without returning them. /// /// This will remove block data for the given range from the following tables: @@ -1299,7 +1197,7 @@ impl ChangeSetReader for DatabaseProvider { } } -impl HeaderSyncGapProvider for DatabaseProvider { +impl HeaderSyncGapProvider for DatabaseProvider { fn sync_gap( &self, tip: watch::Receiver, @@ -1343,7 +1241,7 @@ impl HeaderSyncGapProvider for DatabaseProvider { } } -impl> HeaderProvider +impl> HeaderProvider for DatabaseProvider { fn header(&self, block_hash: &BlockHash) -> ProviderResult> { @@ -1443,7 +1341,7 @@ impl> HeaderProvider } } -impl BlockHashReader for DatabaseProvider { +impl BlockHashReader for DatabaseProvider { fn block_hash(&self, number: u64) -> ProviderResult> { self.static_file_provider.get_with_static_file_or_database( StaticFileSegment::Headers, @@ -1470,7 +1368,7 @@ impl BlockHashReader for DatabaseProvider { } } -impl BlockNumReader for DatabaseProvider { +impl BlockNumReader for DatabaseProvider { fn chain_info(&self) -> ProviderResult { let best_number = self.best_block_number()?; let best_hash = self.block_hash(best_number)?.unwrap_or_default(); @@ -1501,7 +1399,9 @@ impl BlockNumReader for DatabaseProvider { } } -impl> BlockReader for DatabaseProvider { +impl> BlockReader + for DatabaseProvider +{ fn find_block_by_hash(&self, hash: B256, source: BlockSource) -> ProviderResult> { if source.is_canonical() { self.block(hash.into()) @@ -1676,7 +1576,7 @@ impl> BlockReader for Datab } } -impl> TransactionsProviderExt +impl> TransactionsProviderExt for DatabaseProvider { /// Recovers transaction hashes by walking through `Transactions` table and @@ -1746,7 +1646,7 @@ impl> TransactionsProviderE } // Calculates the hash of the given transaction -impl> TransactionsProvider +impl> TransactionsProvider for DatabaseProvider { fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult> { @@ -1906,7 +1806,7 @@ impl> TransactionsProvider } } -impl> ReceiptProvider +impl> ReceiptProvider for DatabaseProvider { fn receipt(&self, id: TxNumber) -> ProviderResult> { @@ -1954,7 +1854,7 @@ impl> ReceiptProvider } } -impl> WithdrawalsProvider +impl> WithdrawalsProvider for DatabaseProvider { fn withdrawals_by_block( @@ -1984,7 +1884,7 @@ impl> WithdrawalsProvider } } -impl> EvmEnvProvider +impl> EvmEnvProvider for DatabaseProvider { fn fill_env_at( @@ -2110,7 +2010,7 @@ impl StageCheckpointWriter for DatabaseProvider StorageReader for DatabaseProvider { +impl StorageReader for DatabaseProvider { fn plain_state_storages( &self, addresses_with_keys: impl IntoIterator)>, @@ -2173,7 +2073,7 @@ impl StorageReader for DatabaseProvider { } } -impl StateChangeWriter for DatabaseProvider { +impl StateChangeWriter for DatabaseProvider { fn write_state_reverts( &self, reverts: PlainStateReverts, @@ -2550,7 +2450,7 @@ impl StateChangeWriter for DatabaseProvider TrieWriter for DatabaseProvider { +impl TrieWriter for DatabaseProvider { /// Writes trie updates. Returns the number of entries modified. fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult { if trie_updates.is_empty() { @@ -2600,7 +2500,7 @@ impl TrieWriter for DatabaseProvider { } } -impl StorageTrieWriter for DatabaseProvider { +impl StorageTrieWriter for DatabaseProvider { /// Writes storage trie updates from the given storage trie map. First sorts the storage trie /// updates by the hashed address, writing in sorted order. fn write_storage_trie_updates( @@ -2637,7 +2537,7 @@ impl StorageTrieWriter for DatabaseProvider HashingWriter for DatabaseProvider { +impl HashingWriter for DatabaseProvider { fn unwind_account_hashing<'a>( &self, changesets: impl Iterator, @@ -2862,7 +2762,7 @@ impl HashingWriter for DatabaseProvider } } -impl HistoryWriter for DatabaseProvider { +impl HistoryWriter for DatabaseProvider { fn unwind_account_history_indices<'a>( &self, changesets: impl Iterator, @@ -2996,7 +2896,7 @@ impl HistoryWriter for DatabaseProvider } } -impl StateReader for DatabaseProvider { +impl StateReader for DatabaseProvider { fn get_state(&self, block: BlockNumber) -> ProviderResult> { self.get_state(block..=block) } @@ -3417,7 +3317,7 @@ impl + } } -impl PruneCheckpointReader for DatabaseProvider { +impl PruneCheckpointReader for DatabaseProvider { fn get_prune_checkpoint( &self, segment: PruneSegment, @@ -3444,7 +3344,7 @@ impl PruneCheckpointWriter for DatabaseProvider StatsReader for DatabaseProvider { +impl StatsReader for DatabaseProvider { fn count_entries(&self) -> ProviderResult { let db_entries = self.tx.entries::()?; let static_file_entries = match self.static_file_provider.count_entries::() { @@ -3457,7 +3357,7 @@ impl StatsReader for DatabaseProvider { } } -impl ChainStateBlockReader for DatabaseProvider { +impl ChainStateBlockReader for DatabaseProvider { fn last_finalized_block_number(&self) -> ProviderResult> { let mut finalized_blocks = self .tx @@ -3592,17 +3492,3 @@ fn recover_block_senders( Ok(()) } - -fn range_size_hint(range: &impl RangeBounds) -> Option { - let start = match range.start_bound().cloned() { - Bound::Included(start) => start, - Bound::Excluded(start) => start.checked_add(1)?, - Bound::Unbounded => 0, - }; - let end = match range.end_bound().cloned() { - Bound::Included(end) => end.saturating_add(1), - Bound::Excluded(end) => end, - Bound::Unbounded => return None, - }; - end.checked_sub(start).map(|x| x as _) -} diff --git a/crates/storage/provider/src/test_utils/blocks.rs b/crates/storage/provider/src/test_utils/blocks.rs index 8439aef16..2c9c10813 100644 --- a/crates/storage/provider/src/test_utils/blocks.rs +++ b/crates/storage/provider/src/test_utils/blocks.rs @@ -1,5 +1,5 @@ //! Dummy blocks and data for tests -use crate::{DatabaseProviderRW, ExecutionOutcome}; +use crate::{DBProvider, DatabaseProviderRW, ExecutionOutcome}; use alloy_consensus::{TxLegacy, EMPTY_OMMER_ROOT_HASH}; use alloy_primitives::{ b256, hex_literal::hex, map::HashMap, Address, BlockNumber, Bytes, Log, Sealable, TxKind, B256, diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs index 6a463ed01..20aebce88 100644 --- a/crates/storage/storage-api/src/database_provider.rs +++ b/crates/storage/storage-api/src/database_provider.rs @@ -1,6 +1,14 @@ -use reth_db_api::{database::Database, transaction::DbTx}; +use reth_db_api::{ + common::KeyValue, + cursor::DbCursorRO, + database::Database, + table::Table, + transaction::{DbTx, DbTxMut}, + DatabaseError, +}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderResult; +use std::ops::{Bound, RangeBounds}; /// Database provider. pub trait DBProvider: Send + Sync + Sized + 'static { @@ -34,6 +42,101 @@ pub trait DBProvider: Send + Sync + Sized + 'static { /// Returns a reference to prune modes. fn prune_modes_ref(&self) -> &PruneModes; + + /// Return full table as Vec + fn table(&self) -> Result>, DatabaseError> + where + T::Key: Default + Ord, + { + self.tx_ref() + .cursor_read::()? + .walk(Some(T::Key::default()))? + .collect::, DatabaseError>>() + } + + /// Return a list of entries from the table, based on the given range. + #[inline] + fn get( + &self, + range: impl RangeBounds, + ) -> Result>, DatabaseError> { + self.tx_ref().cursor_read::()?.walk_range(range)?.collect::, _>>() + } + + /// Iterates over read only values in the given table and collects them into a vector. + /// + /// Early-returns if the range is empty, without opening a cursor transaction. + fn cursor_read_collect>( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + let capacity = match range_size_hint(&range) { + Some(0) | None => return Ok(Vec::new()), + Some(capacity) => capacity, + }; + let mut cursor = self.tx_ref().cursor_read::()?; + self.cursor_collect_with_capacity(&mut cursor, range, capacity) + } + + /// Iterates over read only values in the given table and collects them into a vector. + fn cursor_collect>( + &self, + cursor: &mut impl DbCursorRO, + range: impl RangeBounds, + ) -> ProviderResult> { + let capacity = range_size_hint(&range).unwrap_or(0); + self.cursor_collect_with_capacity(cursor, range, capacity) + } + + /// Iterates over read only values in the given table and collects them into a vector with + /// capacity. + fn cursor_collect_with_capacity>( + &self, + cursor: &mut impl DbCursorRO, + range: impl RangeBounds, + capacity: usize, + ) -> ProviderResult> { + let mut items = Vec::with_capacity(capacity); + for entry in cursor.walk_range(range)? { + items.push(entry?.1); + } + Ok(items) + } + + /// Remove list of entries from the table. Returns the number of entries removed. + #[inline] + fn remove(&self, range: impl RangeBounds) -> Result + where + Self::Tx: DbTxMut, + { + let mut entries = 0; + let mut cursor_write = self.tx_ref().cursor_write::()?; + let mut walker = cursor_write.walk_range(range)?; + while walker.next().transpose()?.is_some() { + walker.delete_current()?; + entries += 1; + } + Ok(entries) + } + + /// Return a list of entries from the table, and remove them, based on the given range. + #[inline] + fn take( + &self, + range: impl RangeBounds, + ) -> Result>, DatabaseError> + where + Self::Tx: DbTxMut, + { + let mut cursor_write = self.tx_ref().cursor_write::()?; + let mut walker = cursor_write.walk_range(range)?; + let mut items = Vec::new(); + while let Some(i) = walker.next().transpose()? { + walker.delete_current()?; + items.push(i) + } + Ok(items) + } } /// Database provider factory. @@ -54,3 +157,17 @@ pub trait DatabaseProviderFactory: Send + Sync { /// Create new read-write database provider. fn database_provider_rw(&self) -> ProviderResult; } + +fn range_size_hint(range: &impl RangeBounds) -> Option { + let start = match range.start_bound().cloned() { + Bound::Included(start) => start, + Bound::Excluded(start) => start.checked_add(1)?, + Bound::Unbounded => 0, + }; + let end = match range.end_bound().cloned() { + Bound::Included(end) => end.saturating_add(1), + Bound::Excluded(end) => end, + Bound::Unbounded => return None, + }; + end.checked_sub(start).map(|x| x as _) +}